`
greemranqq
  • 浏览: 966684 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

spring+activemq - demo2- QUEUE和TOPIC 实现

    博客分类:
  • JMS
阅读更多

一、序言

       JMS 相关的东西已经出来了很久,本想使用阿里的rocketMQ 发现很多没遵循JMS 规范,暂时就用用activeMq,做一些常用的系统解耦 协同工作,这里还是和spring 进行集成,spring 和JMS 配合还是挺好的。

 

二、场景

       A系统产生了一笔订单,那么我们其他B C 系统会拿到订单的基本信息,然后进行金额的计算 以及 用户资料的分析 等等操作,以前的方法是 写个定时任务 扫描最新订单,但是实时性 就没那么高了,而且随着需求得越来越多,导致新进来一笔订单,就会有N个任务进行扫描 分析,扩展很死板,维护也麻烦。

 

 

三、实例代码

       3.1 为了简单,到http://activemq.apache.org/download.html 下载activeMq 的东西,这里用的版本是5.10。

       3.2 暂时先用activemq 自带的服务器,测试先用P2P  1对1 的方式 ,看看效果。

      

       spring xml 代码:

     

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
    <!-- jms 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <!-- TCP异步传输  -->
        <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=true" />
    </bean>

    <!-- 基本的bean模板 -->
    <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
        <!-- 链接工长 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 -->
        <!-- DeliveryMode.PERSISTENT=2:持久-->
        <property name="deliveryMode" value="1" />
    </bean>
    <!-- 队列的目的地描述 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 通过 构造 设定 队列的名字 -->
        <constructor-arg index="0" value="orderQueue"/>
    </bean>
</beans>

 

    消息发送者代码:

    这里虚拟了一个订单类Order 表示一笔订单,和一个单独的线程Sender 进行消息发送,一共20条消息

   

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.*;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
 * Created by qiqiang on 2014/12/2.
 */
public class JmsPointSender {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml");
        // 获得JMS 模板
        JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate");
        // 获得发送消息的目的地
        Destination destination = (Destination)context.getBean("queueDestination");
        // 这里模拟 单独另外的消息发送的线程
        JmsSender sender = new JmsSender(jmsTemplate,destination);
        ExecutorService service = Executors.newFixedThreadPool(1);
        service.execute(sender);
    }
}

// 发送消息 的类
class   JmsSender implements Runnable{
    JmsTemplate jmsTemplate;
    Destination destination;

    JmsSender(JmsTemplate jmsTemplate, Destination destination) {
        this.jmsTemplate = jmsTemplate;
        this.destination = destination;
    }
    // 定义个消息发送的条数
    static int i = 0;
    @Override
    public void run() {
        // 这里我们一秒发送一条消息的模式
        for(;i<20;i++){
            // 发送消息
            jmsTemplate.send(destination, new MessageCreator() {
                // 这里的session 会有工厂自动创建
                public Message createMessage(Session session) throws JMSException {
                    Order order = new Order(i, "name"+i);
                    // 消息分为很多种,有字符串 字节  对象等,这里我们使用对象
                    System.out.println("发送条数--------------------------"+i);
                    return session.createObjectMessage(order);
                }
            });
        }

    }
}


// 订单的属性
class  Order implements Serializable {
    private int id;
    private String name;

    Order(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public Order(){}


    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

 

 

    消息接收者的 类   

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;


import javax.jms.*;


/**
 * Created by qiqiang on 2014/12/9.
 */
public class JmsReceiver {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml");
        // 获得发送消息的目的地
        Destination destination = (Destination)context.getBean("queueDestination");
        // 获得JMS 模板
        JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate");
        int i = 0;
        ObjectMessage message = null;
        // 这里写个循环,一直接受消息到结束
        while ( (message = (ObjectMessage)jmsTemplate.receive(destination)) != null){
            System.out.println("收到消息条数:"+i++  +" "+message.getObject());;
        }

    }
}

 

 

 四、执行过程:

        1.解压我们下下来的activemq,然后在apache-activemq-5.1.0\bin 目录下,windows 系统 直接运行activemq.bat,linux 应该有个activemq.sh  的启动项,总之我先运行这个broker,这个东西我们可以认为是消息的一个存放中心,一个中转站.运行成功了 可以从http://localhost:8161/admin/queues.jsp  地址提供的一个页面,里面有消息的信息。

 

         2.执行JmsPointSender ,没出错的话,应该会看到消息发送了20条,并且在queues.jsp 里面能看到如下信息:orderQueue 是我们的Destination,20 就是我们发送的消息数。

         

      

          3.启动JmsReceiver,就会看到消费了20条消息,上面的变化就看到0,1,20,20

          表示消费1个,发送20条,消费20条,还剩0条,消费过程就结束了

 

五、Topic 订阅者消费模式

       刚才我们是1对1 的消费模式,肯定不是不满足我们得需求的, 我们需要一笔订单需要 N 个消费者去处理,因此改变如下:

      

 <!-- 消息订阅模式,在spring xml 里面加,这是发送的 Destination -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 订阅消息的名字 -->
        <constructor-arg index="0" value="orderTopic"/>
    </bean>

 

    由于这种属于发布订阅模式,因此消费者得一直监听,才能收到消息,我们先做一个消费者得监听器:

     

public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        // 很简单,我们直接打印
        System.out.println("topic 收到消息:"+message);
    }
}

 

    然后spring 里面同样要配置监听的情况,为了模拟发布者 和 消费在不同的服务器,我们做写两个xml 文件:spring-jms-consumer.xml,内容除了监听,连接服务器的东西都一样

     

 <!-- jms 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false" />
    </bean>

    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 消息订阅模式 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 订阅消息的名字 -->
        <constructor-arg index="0" value="orderTopic"/>
    </bean>

    <!-- 消息监听,这里可以认为是A服务器的监听,这里特殊 -->
    <bean id="messageListener" class="com.xx.ConsumerMessageListener"/>
    <bean id="listenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <!-- 工厂 目的地 监听器 这里如果用原始activemq 写,这些属性也是必要的 -->
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="messageListener" />
    </bean>

 

    这里再看 发送者的代码

    

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.*;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by qiqiang on 2014/12/9.
 */
public class JmsTopicSender {
    static int i = 1;
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms.xml");
        // 获得JMS 模板,这里都差不多一样
        JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate");
        // 获得发送消息的目的地
        Destination destination = (Destination)context.getBean("topicDestination");
        // 发送消息
        jmsTemplate.send( destination,new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Order order = new Order(i, "name"+i+":"+session);
                System.out.println("发送条数--------------------------"+i);
                return session.createObjectMessage(order);
            }
        });
    }
}

  

    下面模拟是一个消费者的代码

   

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

/**
 * Created by qiqiang on 2014/12/10.
 */
public class JmsTopicReceiver{
    public static void main(String[] args) throws Exception {
        // 加载消费者监听
        ApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-jms-consumer.xml");
        // 写个死循环,模拟服务器一直运行
        while (true){}
    }
}

 

   最后先启动 消费者JmsTopicReceiver,然后再启动发送,这样模拟先监听 后发送的情况,如果先发送,而没监听 是收不到消息的。

 

小结:

    1.上面只是最基本的测试,有问题再说...

    2.关于queue 可以尝试下 发送10W跳数据看看会出现什么问题,也可以尝试下发送一半突然中断会出现什么问题?

    3.关于topic 模式,假设我发送者发送消息的时候,有一台消费者恰好重启或者挂掉怎么办?仅仅持久化吗?

    这些后面再解决!

    

        

       

  • 大小: 5.5 KB
分享到:
评论

相关推荐

    spring-boot-activemq-demo

    spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。

    spring+activemq必备jar包

    spring+activemq必备jar包:activeio-core-3.1.4.jar,activemq-all-5.13.2.jar,activemq-pool-5.13.2.jar,commons-pool2-2.4.2.jar

    ActiveMQ-demo

    ActiveMQ-demo实例代码: /* *生产者启动程序,并发送消息给amq服务器(broker) */ public class Producer { /** * @param args * @throws Exception */ public static void main(String[] args) throws ...

    Spring + ActiveMq + maven demo

    Windowss 下 Spring + ActiveMq + maven 集成配置分为两章,环境搭建之前参考了网上的博客以及同事的配置。

    Spring+ActiveMQ简易demo

    这是我从我工作项目中精简下来的Spring+ActiveMQ建议项目,代码亲测可行,也简单明了,很容易就能嵌入别的项目里面,已经通过这个demo嵌了2个实际项目。直接跑程序就可以发送并接受消息。前提要自己起MQ服务,至于MQ...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务的demo

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

    activemq-spring-5.9.1-javadoc.jar

    标签:activemq-spring-5.9.1-javadoc.jar,activemq,spring,5.9.1,javadoc,jar包下载,依赖包

    spring+activemq

    spring+activemq,示例工程,演示JMS消息通信。。。。。。。

    spring-boot-activemq-demo.zip

    spring-boot 集成activemq代码demo

    spring activeMQ-demo 配置

    spring activeMQ demo 配置

    spring-activeMQ-demo:spring-activeMQ-演示

    spring-activeMQ-演示 spring-activeMQ-演示 怎么跑? 1片spring-activeMQ-demo ``mvn全新安装`'' 2以tomcat7身份运行 3访问

    Spring+ActiveMQ实现,基于Maven

    基于Maven的Spring+ActiveMQ,比较贴合实际生产,只实现了Topic,queue改点配置就行了

    activemq-web-console-5.11.2

    activemq-web-console的默认使用方式是通过在...2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置的http文件服务器。

    activemq-spring-5.5.1-sources.jar

    标签:activemq-spring-5.5.1-sources.jar,activemq,spring,5.5.1,sources,jar包下载,依赖包

    Spring+ActiveMQ整合实例代码工程

    Spring+ActiveMQ整合实例代码工程,朋友提供给我参考的,我备个份以便下次查阅,也分享给大家,看看对大家有没有帮助了

    activemq-all-5.8.0.jar

    activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载

    ActiveMQ+Spring+Maven Demo

    使用spring jmstemplate写的activemq小demo,浅显易懂。工程下载导入可用(maven 工程) activemq 可直接apache官网下载 传送门http://activemq.apache.org/download.html

    activemq-spring-5.4.2-sources.jar

    标签:activemq-spring-5.4.2-sources.jar,activemq,spring,5.4.2,sources,jar包下载,依赖包

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置

    activemq-demo

    一个用Spring+Activemq实现的消息平台

Global site tag (gtag.js) - Google Analytics