阅读 3775

消息队列之异步消息的基本概念以及ActiveMQ整合Spring的常用用法介绍 | 掘金技术征文

一 简介

(1)异步消息:

所谓异步消息,跟RMI远程调用、webservice调用是类似的,异步消息也是用于应用程序之间的通信。但是它们之间的区别是:

  • RMI、Hession/Burlap、webservice等远程调用机制是同步的。也就是说,当客户端调用远程方法时,客户端必须等到远程方法响应后才能继续执行
  • 异步消息,顾名思义消息是异步发送,消息发送者不需要等待消息消费者处理消息,甚至不需要等待消息投递完成就可以继续发送消息。这是因为消息发送者默认消息接收最终可以收到这条消息并进行处理

(2)Java消息服务(JMS):

Java消息服务(Java Message Service,即:JMS)是Java中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持

JMS是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,其作用类似于JDBC

(3)消息代理(message broker)和目的地(destination):

在异步消息中有两个重要的概念,分别是:消息代理和目的地

当我们在一个应用中发送一条消息时,会将该消息移交给一个消息代理(PS:一般是一些消息中间件,如:ActiveMQ )。在这里,消息代理就类似于邮局,消息代理可以确保消息被投递到指定的目的地,同时解放消息发送者,使其能够继续进行其他业务

同样,每条异步消息在被消息发送者发送时都要指定一个目的地(PS:用于区别不同类型的消息),然后消息接收者就可以根据自己的业务需求从指定的目的地(PS:消息还是在消息中间件存放,目的是是用于区别不同类型的消息)获取自己所需的消息并进行处理

(4)队列(queue)与主题(topic):

i)队列(queue):

队列也即是:点对点消息模型

在点对点模型中,每条消息分别只有一个发送者和接收者。也就是说,当消息代理得到发送者发送的消息时,它会将该消息放入到一个队列中。当某一个消息接收者(PS:同一目的地的消息接收者可能存在多个)请求队列中的下一条消息时,消息会从队列中取出并投递给该接收者。之后该条消息将会从队列中删除,这样就可以保证一条消息只投递给一个接收者

点对点消息模型

ii)主题(topic):

主题也即是:发布——订阅消息模型

在发布——订阅消息模型中,每条消息可以由多个接收者接收。也就是说,消息不再是只投递给一个接收者,而是主题的所有订阅者都会收到该消息的副本

发布——订阅消息模型

(5)异步消息的优点:

i)无需等待:

在同步通信中,如果客户端与远程服务频繁通信,或者远程服务响应很慢,就会对客户端应用的性能带来负面影响

当使用JMS发送消息时,客户端不必等待消息被处理,甚至是被投递,客户端只需要将消息发送给消息代理,就可以确信消息会被投递给相应的目的地

因为不需要等待,所以客户端可以继续执行其他任务,这种方式可以有效的节省时间,客户端的性能能够得到极大的提高

ii)面向消息与解耦:

在同步通信中,客户端通过服务接口与远程服务相耦合,如果服务的接口发生变化,那么此服务的所有客户端都需要做相应的改变

当使用JMS发送消息时,发送异步消息是以数据为中心的。这意味着客户端并没有与特定的方法签名绑定,任何可以处理数据的队列或主题订阅者都可以处理由客户端发送的消息,而客户端不必了解远程服务的任何规范

iii)位置独立:

同步RPC服务通常需要网络地址来定位。这意味着客户端无法灵活地适应网络拓扑的改变。如果服务的IP地址改变了,或者服务被配置为监听其他端口,客户端必须进行相应的调整,否则无法访问服务。

与之相反,消息客户端不必知道谁会处理它们的消息,或者服务的位置在哪里。客户端只需要了解需要通过哪个队列或主题来发送消息。因此,只服务能够从队列或主题中获取即可,消息客户端根本不需要关注服务来自哪里

在点对点模型中,可以利用这种位置的独立性来创建消息服务集群。如果客户端不知道服务的位置,并且服务的唯一要求就是可以访问消息代理,那么我们就可以配置多个服务从同一个队列中接收消息。如果服务过载,处理能力不足,我们只需要添加一些新的的服务(接收者)实例来监听相同的队列即可平滑增强其处理能力

在发布一订阅模型中,位置独立性会产生另一种有趣的效应。多个服务可以订阅同一个主题,接收相同消息的副本。但是每一个服务对消息的处理方式却可能不同。例如,假设我们有一组可以共同处理描述新员工信息的消息。一个服务可能会在工资系统中增加该员工,另一个服务则会将新员工增加到公司交流群中,同时还有一个服务为新员工分配内网系统的访问权限。在这里,每一个服务都是基于相同的数据(都是从同一个主题接收而来),但是却各自对数据进行了不同的处理

在上面的内容中,我介绍了一些关于异步消息的基本概念。下面我将介绍基于ActiveMQ框架的JMS消息的发送与接收以及ActiveMQ在Spring框架中的一些常用用法

二 基于JMS的消息发送与接收

(1)ActiveMQ的下载与启动:

在正式开始编写测试实例之前,我们需要做的是ActiveMQ的下载。其官方下载地址是:activemq.apache.org/download.ht…

然后运行:apache-activemq-5.14.1/bin/win64/activemq.bat ,接着保持控制台窗口不关闭,访问:http://127.0.0.1:8161/admin/

注:默认账号密码是:admin/admin


此时,我们可以通过访问菜单中的“ Queues”或者“ Topics”来实时监控队列或者主题类型的消息使用情况


当然,此时因为没有任何消息存在,所以界面是空白的

(2)使用JMS发送消息:

package cn.zifangsky.test.base;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * @author zifangsky
 *
 */
public class JMSProducer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        try {
            //连接
            Connection connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //消息目的地
            Destination destination = session.createQueue("hello");
            //消息生产者
            MessageProducer producer = session.createProducer(destination);

            //发送消息
            for(int i=0;i<10;i++){
                //创建一条文本消息
                TextMessage message = session.createTextMessage("ActiveMQ:这是第 " + i + " 条消息");
                //生产者发送消息
                producer.send(message);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}复制代码

运行上面的代码之后可以发现ActiveMQ的队列监控界面出现了变化:


很显然,生成了10条目的地为“hello”的未被消费的消息

(3)使用JMS接收消息:

package cn.zifangsky.test.base;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author zifangsky
 *
 */
public class JMSConsumer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //连接
            Connection connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //消息目的地
            Destination destination = session.createQueue("hello");
            //消息消费者
            MessageConsumer consumer = session.createConsumer(destination);
            while(true){
                TextMessage message = (TextMessage) consumer.receive();
                if(message != null){
                    System.out.println("接收到消息: " + message.getText());
                }else{
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }


    }

}复制代码

运行代码之后,输出如下:

接收到消息: ActiveMQ:这是第 0 条消息
接收到消息: ActiveMQ:这是第 1 条消息
接收到消息: ActiveMQ:这是第 2 条消息
接收到消息: ActiveMQ:这是第 3 条消息
接收到消息: ActiveMQ:这是第 4 条消息
接收到消息: ActiveMQ:这是第 5 条消息
接收到消息: ActiveMQ:这是第 6 条消息
接收到消息: ActiveMQ:这是第 7 条消息
接收到消息: ActiveMQ:这是第 8 条消息
接收到消息: ActiveMQ:这是第 9 条消息

当然,此时观察ActiveMQ的队列监控界面,可以发现这10条消息已经被消费了

注:上面的代码很简单,并且其思路跟JDBC很类似,因此这里就不做过多解释了

三 Spring框架整合ActiveMQ的常见用法

如果写过很多的JDBC代码的话,可以发现使用基本的JMS来发送和接收消息就跟JDBC代码一样,需要每次写很多冗长重复的代码。

针对如何消除冗长和重复的JMS代码,Spring给出的解决方案是JmsTemplate。JmsTemplate可以创建连接、获得会话以及发送和接收消息。这使得我们可以专注于构建要发送的消息或者处理接收到的消息

另外,JmsTemplate可以处理所有抛出的笨拙的JMsException异常。如果在使用JmsTemplate时抛出JMsException异常,JmsTemplate将捕获该异常,然后抛出一个非检查型异常,该异常是Spring自带的JmsException异常的子类

(1)一个简单的队列类型的消息发送和接收实例:

i)activemq.properties:

activemq.ip=127.0.0.1
activemq.username=admin
activemq.passwd=admin复制代码

这个文件配置了ActiveMQ的地址以及认证的账号密码

ii)在Spring的配置文件中加载上面的配置文件:

    <bean id="configProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath:jdbc.properties</value>
                <value>classpath:activemq.properties</value>
            </list>
        </property>
    </bean>

    <bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
            <property name="properties" ref="configProperties" />  
    </bean>复制代码

iii)ActiveMQ的配置文件context_activemq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd">

    <context:component-scan base-package="cn.zifangsky.activemq" />

    <!-- ActiveMQ 连接工厂 -->
    <!-- <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}"
        password="${activemq.passwd}" /> -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://${activemq.ip}:61616"/>
        <property name="userName" value="${activemq.username}" />
        <property name="password" value="${activemq.passwd}" />
        <!-- <property name="trustAllPackages" value="true"/> -->
        <property name="trustedPackages">
            <list>
                <value>java.lang</value>
                <value>javax.security</value>
                <value>java.util</value>
                <value>org.apache.activemq</value>
                <value>cn.zifangsky.activemq</value>
                <value>cn.zifangsky.model</value>
            </list>
        </property>
    </bean>

    <!-- Spring Caching连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory" />
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义Queue类型的JmsTemplate -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1"/>
        <jms:listener destination="test.queue" ref="testQueueReceiver2"/>
    </jms:listener-container>

</beans>复制代码

当然,在web.xml中需要加载该配置文件才行:

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>
            classpath:context/context.xml
            classpath:context/context_*.xml
        </param-value>
    </context-param>复制代码

在上面的context_activemq.xml文件中,首先是定义了自动扫描cn.zifangsky.activemq 这个包下面的注解,在后面配置的两个接收者:testQueueReceiver1、testQueueReceiver2 的bean就是这样被加载进来的

接着,amqConnectionFactory这个bean配置了ActiveMQ的连接参数(PS:通过配置文件加载进来),以及可信任的可以被序列化的类的包路径

再往后,jmsQueueTemplate这个bean配置了一个JmsTemplate的实例,当然这里定义的是一个队列模型

最后,使用jms:listener-container配置了两个消息监听器,其监听的目的地都是“test.queue”,处理的接收者分别是:testQueueReceiver1 和 testQueueReceiver2

iv)消息发送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component("queueSender")
public class QueueSender {

    @Resource(name="jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 发送一条消息到指定队列
     * @param queueName  队列名称
     * @param message  消息内容
     */
    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }

}复制代码

从上面的代码可以看出,这里仅仅只是使用JmsTemplate的send( )方法创建了一条文本消息

v)两个消息接收者:

QueueReceiver1.java:

package cn.zifangsky.activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver1")
public class QueueReceiver1 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1收到消息: " + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}复制代码

QueueReceiver2.java:

package cn.zifangsky.activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver2")
public class QueueReceiver2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver2收到消息: " + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}复制代码

vi)测试:

package cn.zifangsky.test.springjms;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import cn.zifangsky.activemq.producer.QueueSender;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestQueue {
    private final String QUEUENAME = "test.queue";

    @Resource(name="queueSender")
    private QueueSender queueSender;

    @Test
    public void test(){
        for(int i=0;i<10;i++){
            queueSender.send(QUEUENAME, "Hi,这是第 " + (i+1) + " 条消息!");
        }
    }

}复制代码

运行这个单元测试方法之后,可以发现输出结果如下:

QueueReceiver2收到消息: Hi,这是第 1 条消息!
QueueReceiver1收到消息: Hi,这是第 2 条消息!
QueueReceiver2收到消息: Hi,这是第 3 条消息!
QueueReceiver1收到消息: Hi,这是第 4 条消息!
QueueReceiver2收到消息: Hi,这是第 5 条消息!
QueueReceiver1收到消息: Hi,这是第 6 条消息!
QueueReceiver2收到消息: Hi,这是第 7 条消息!
QueueReceiver1收到消息: Hi,这是第 8 条消息!
QueueReceiver2收到消息: Hi,这是第 9 条消息!
QueueReceiver1收到消息: Hi,这是第 10 条消息!

从上面的输出结果可以看出,队列类型的消息只能被某一个接收者接收并处理

(2)简化代码:

上面的例子很显然在发送和接收消息的时候写的代码要比纯粹的JMS要少很多,那么是不是就真的没有更简洁的代码了呢?

答案当然是否,第一是在发送消息的时候使用了JmsTemplate的send( ) 方法来发送消息。其实,除了send( )方法,JmsTemplate还提供了convertAndSend( )方法,与send( ) 方法不同的是,convertAndSend( )方法并不需要MessageCreator作为参数。这是因为convertAndSend( )方法会使用内置的消息转换器(message converter)为我们创建消息

i)改写jmsQueueTemplate这个bean添加默认的目的地:

    <!-- 定义Queue类型的JmsTemplate -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
        <property name="pubSubDomain" value="false" />
        <property name="defaultDestinationName" value="test.queue"/>
    </bean>复制代码

ii)改写消息发送者的send()方法:

改写之后的方法如下所示:

    /**
     * 发送一条消息到指定队列
     * @param message  消息内容
     */
    public void send(final String message){
        jmsTemplate.convertAndSend(message);
    }复制代码

除了上面使用的内置的消息转换器之外,Spring还为通用的转换任务提供了多个消息转换器(org.springframework.jms.support.converter包中)

消息转换器 功能
MappingJackson2MessageConverter 使用Jackson2 JSON库实现消息与JSON格式的相互转换
MarshallingMessageConverter 使用JAXB库实现消息与XML格式之间的相互转换
SimpleMessageConverter 实现String与TextMessage之间的相互转换、字节数组与BytesMessage之间的相互转换、Map与MapMessage之间的相互转换以及Serializable对象与ObjectMessage之间的相互转换(PS:对象的序列化与反序列化)

注:默认情况下,JmsTemplate会在convertAndSend( )方法中使用SimpleMessageConverter这个消息转换器。如果需要手动执行消息转化器的话,可以这样修改:

    <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter" />

    <bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <property name="pubSubDomain" value="false" />
        <property name="messageConverter" ref="jmsMessageConverter" />
    </bean>复制代码

好了转回正题,针对第一个实例的简化还可以做其他的工作。比如:消息接收者在处理消息的时候实现了一个MessageListener接口,同时复写了onMessage(Message message) 方法。那么我们是否可以将之简化,改写成一个普通的POJO呢?

i)改写QueueReceiver1.java变成一个普通的POJO:

package cn.zifangsky.activemq.consumer;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver1")
public class QueueReceiver1{

    public void handle(String str){
        System.out.println("QueueReceiver1收到消息: " + str);
    }

}复制代码

从上面的代码可以看出,这里仅仅只定义了一个普通的handle(String str) 方法,完全看不出来任何JMS的痕迹

ii)修改队列消息监听器:

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
        <jms:listener destination="test.queue" ref="testQueueReceiver2"/>
    </jms:listener-container>复制代码

这里只改写了第一个监听相关配置,手动指定了针对接收到的消息的处理方法。当然,Spring会自动完成消息格式的转化

再次运行单元测试:

    @Test
    public void test(){
        for(int i=0;i<10;i++){
            queueSender.send("Hi,这是第 " + (i+1) + " 条消息!");
        }
    }复制代码

输出略

(3)发布——订阅类型的消息发送和接收实例:

明白了上面的队列类型的消息发送与接收,那么定义一个发布——订阅类型的消息就很简单了,只需要把JmsTemplate的类型改成“pubSubDomain”类型即可:

i)context_activemq.xml文件里面的配置:

添加以下内容:

    <!-- 定义Topic类型的JmsTemplate -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
        <property name="defaultDestinationName" value="test.topic"/>
    </bean>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="testTopicReceiver1" method="handle"/>
        <jms:listener destination="test.topic" ref="testTopicReceiver2" method="handle"/>
    </jms:listener-container>复制代码

ii)消息发送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component("topicSender")
public class TopicSender {

    @Resource(name="jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 发送一条消息到指定队列
     * @param message  消息内容
     */
    public void send(final String message){
        jmsTemplate.convertAndSend(message);
    }
}复制代码

iii)两个消息接收者:

package cn.zifangsky.activemq.consumer;

import org.springframework.stereotype.Component;

@Component("testTopicReceiver1")
public class TopicReceiver1{

    public void handle(String str){
        System.out.println("TopicReceiver1收到消息: " + str);
    }

}复制代码

另一个接收者代码跟上面相似,请自行完成,略

iv)单元测试:

package cn.zifangsky.test.springjms;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import cn.zifangsky.activemq.producer.TopicSender;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestTopic {

    @Resource(name="topicSender")
    private TopicSender topicSender;

    @Test
    public void test(){
        for(int i=0;i<5;i++){
            topicSender.send("Hi,这是第 " + (i+1) + " 条消息!");
        }
    }

}复制代码

输出如下:

TopicReceiver2收到消息: Hi,这是第 1 条消息!
TopicReceiver1收到消息: Hi,这是第 1 条消息!
TopicReceiver1收到消息: Hi,这是第 2 条消息!
TopicReceiver1收到消息: Hi,这是第 3 条消息!
TopicReceiver1收到消息: Hi,这是第 4 条消息!
TopicReceiver2收到消息: Hi,这是第 2 条消息!
TopicReceiver2收到消息: Hi,这是第 3 条消息!
TopicReceiver2收到消息: Hi,这是第 4 条消息!
TopicReceiver2收到消息: Hi,这是第 5 条消息!
TopicReceiver1收到消息: Hi,这是第 5 条消息!

从上面的输出内容可以看出,发布——订阅类型的消息每个接收者都会接收到一份消息的副本

(4)发送和接收对象类型的消息:

如果我们想要发送和接收对象类型的消息,而不是普通的文本消息。其实,因为Spring提供了默认的消息转换器——SimpleMessageConverter。所以我们只需要像发送文本消息那样发送对象消息,关于对象的序列化和反序列化这些步骤Spring会自动帮我们完成

i)修改context_activemq.xml文件:

添加以下内容:

    <bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
        <property name="pubSubDomain" value="false" />
        <property name="defaultDestinationName" value="object.queue"/>
    </bean>复制代码

这里,定义了新的JmsTemplate,其默认的目的地是:object.queue

修改队列监听器,添加:

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        ...
        <jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
    </jms:listener-container>复制代码

ii)消息发送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import cn.zifangsky.model.User;

@Component("queueSender2")
public class QueueSender2 {

    @Resource(name="jmsQueueTemplate2")
    private JmsTemplate jmsTemplate;

    /**
     * 发送一条消息到指定队列
     * @param user  一个User类型的实体
     */
    public void send(final User user){
        jmsTemplate.convertAndSend(user);
    }

}复制代码

可以看出,这里的方法参数就不是普通的文本了,而是一个可以被序列化的对象

注:User.java:

package cn.zifangsky.model;

import java.io.Serializable;

public class User implements Serializable{
    private static final long serialVersionUID = 1L;
    private Long id;
    private String username;
    private String password;

    public User() {

    }

    public User(Long id, String username, String password) {
        this.id = id;
        this.username = username;
        this.password = password;
    }

    (getter和setter方法略)

    @Override
    public String toString() {
        return "User [id=" + id + ", username=" + username + ", password=" + password + "]";
    }

}复制代码

iii)消息接收者:

package cn.zifangsky.activemq.consumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import cn.zifangsky.model.User;

@Component("testQueueReceiver3")
public class QueueReceiver3{

    public void handle(User user){
        System.out.println("接收到消息: " + user);
    }
}复制代码

可以看到,这里的handle方法的参数时User类型。当然这个User对象是Spring将消息进行反序列化后生成的

iv)测试:

    @Resource(name="queueSender2")
    private QueueSender2 queueSender2;

    @Test
    public void testObject(){
        User u = new User((long) 1,"test","123456");

        queueSender2.send(u);
    }复制代码

输出如下:

接收到消息: User [id=1, username=test, password=123456]

可以看出,我们实际需要做的工作还是很少的,很多繁琐的步骤都由Spring在后台自动完成了

(5)消息被接收到之后进行回复:

有时,为了保障消息的可靠性,通常需要在接收到消息之后给某个消息目的地发送一条确认收到的回复消息。当然,要实现这个功能也很简单,只需要在收到消息之后调用某个消息发送者发送一条确认消息即可

比如上面的QueueReceiver3可以改成这样:

package cn.zifangsky.activemq.consumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import cn.zifangsky.activemq.producer.QueueSender;
import cn.zifangsky.model.User;

@Component("testQueueReceiver3")
public class QueueReceiver3{

    @Resource(name="queueSender")
    private QueueSender queueSender;

    public void handle(User user){
        System.out.println("接收到消息: " + user);
        queueSender.send("QueueReceiver3已经收到Object类型的消息。。。");
    }

}复制代码

这样就可以在收到消息之后,使用QueueSender 这个发送者给“test.queue”这个消息目的地发送一条确认消息了(PS:实际情况的处理可能会比这里稍微复杂一点,这里为了测试只是发送了一条文本消息)

注:使用单元测试的时候最后一条消息可能不会打印出来,因为此次单元测试的生命周期结束之后程序就自动停止了。解决办法可以是手动执行一下第一个实例中的那个单元测试,或者启动这个web项目就可以看到效果了

(6)设置多个并行的消息监听器:

在前面介绍队列类型的监听器的时候为了验证一条队列里的消息只能被一个接收者接收,因此添加了两个功能完全一样的接收者:testQueueReceiver1和testQueueReceiver2

其实在实际开发中,为了提高系统的消息处理能力我们完全没必要像这样定义多个功能一样的消息接收者,相反我们只需要在配置监听器的时候使用concurrency这个属性配置多个并行的监听器即可。比如像这样:

    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" concurrency="5" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
        <jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
    </jms:listener-container>复制代码

如果concurrency属性设置一个固定的数字则表示每个消息监听器都会被同时启动指定个数的完全一样的并行监听器来监听消息并转发给消息接收者处理。当然,除了指定固定的数字之外,我们还可以手动指定一个监听器的数目区间,比如:concurrency=”3-5″ ,表示最少打开3个监听器,最多打开5个监听器。消息少时会少打开几个,消息多时会多打开几个,这一过程会自动完成而不需要我们做其他额外的工作

基于ActiveMQ的异步消息的一些常用用法基本上就是这些了。当然,还有一些其他的内容在这里没有介绍到,比如:导出基于JMS的服务、使用AMQP实现消息功能等。限于文章篇幅,这些内容暂且让我放在其他文章单独介绍吧

参考: