阅读 130

深入剖析RabbitMQ——春天框架下实现AMQP高级消息队列协议

前言

消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架ActiveMQ, RabbitMQ、卡夫卡进行分析对比。
详细介绍RabbitMQ在春天框架下的结构及实现原理,从生产端的事务,回调函数(ConfirmCallback / ReturnCallback)到消费者端的MessageListenerContainer信息接收容器进行详细的分析。通过对RabbitTemplate, SimpleMessageListenerContainer, DirectMessageListenerContainer等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。
并举以实例对死信队列,持久化操作进行一一介绍。

一,RabbitMQ与AMQP的关系

1.1 AMQP简介

AMQP(高级消息队列协议高级消息队列协议)是一个消息队列协议,它支持符合条件的客户端和消息代理中间件(消息代理中间件)进行通讯.RabbitMQ则是AMQP协议的实现者,主要用于在分布式系统中信息的存储发送与接收,RabbitMQ的服务器端用Erlang语言编写,客户端支持多种开发语言:Python,。 净、Java、Ruby C, PHP, ActionScript, XMPP,踩等。

1.2 ActiveMQ, RabbitMQ,卡夫卡对比

现在在市场上有ActiveMQ, RabbitMQ,卡夫卡等多个常用的消息队列框架,与其他框架对比起来,RabbitMQ在易用性,扩展性,高可用性,多协议,支持多语言客户端等方面都有不俗表现。

1.2.1 AcitveMQ特点

ActiveMQ是Apache以Java语言开发的消息模型,它完美地支持JMS (Java消息服务)消息服务,客户端支持Java, C, c++, c#, Ruby、Perl、Python、PHP等多种开主发语言,支持OpenWire,跺脚,休息,XMPP, AMQP等多种协议.ActiveMQ采用异步消息传递方式,在设计上保证了多主机集群,客户端——服务器、点对点等模式的有效通信。从开始它就是按照JMS 1.1和J2EE 1.4规范进行开发,实现了消息持久化,XA,事务支撑等功能。经历多年的升级完善,现今已成为Java应用开发中主流的消息解决方案。但相比起RabbitMQ,卡夫卡它的主要缺点表现为资源消耗比较大,吞吐量较低,在高并发的情况下系统支撑能力较弱。如果系统全程使用Java开发,其并发量在可控范围内,或系统需要支持多种不同的协议,使用ActiveMQ可更轻便地搭建起消息队列服务。

1.2.2卡夫卡特点

卡夫卡天生是面向分布式系统开发的消息队列,它具有高性能,容灾性,可动态扩容等特点.Kafka与生俱来的特点在于它会把每个分区的数据都备份到不同的服务器当中,并与饲养员配合,当某个经纪人故障失效时,饲养员服务就会将通知生产者和消费者,从备份服务器进行数据恢复。在性能上卡夫卡也大大超越了传统的ActiveMQ, RabbitMQ,由于卡夫卡集群可支持动态扩容,在负载量到达峰值时可动态增加新的服务器进集群而无需重启服务。但由于卡夫卡属于分布式系统,所以它只能在同一分区内实现消息有序,无法实现全局消息有序。而且它内部的监控机制不够完善,需要安装插件,依赖饲养员进行元数据管理。如果系统属于分布式管理机制,数据量较大且并发量难以预估的情况下,建议使用卡夫卡队列。

1.2.3 RabbitMQ对比

由于ActiveMQ过于依赖JMS的规范而限制了它的发展,所以RabbitMQ在性能和吞吐量上明显会优于ActiveMQ。 由于上市时间较长,在可用性、稳定性、可靠性上RabbitMq会比卡夫卡技术成熟,而且RabbitMq使用Erlang开发,所以天生具备高并发高可用的特点。而卡夫卡属于分布式系统,它的性能,吞吐量,TPS都会比RabbitMq要强。

二,RabbitMQ的实现原理

2.1生产者(生产者),消费者(消费者),服务中心(代理)之间的关系

首先简单介绍RabbitMQ的运行原理,在RabbitMQ使用时,系统会先安装并启动代理服务器,也就是RabbitMQ的服务中心。无论是生产者(生产者),消费者(消费者)都会通过连接池(连接)使用TCP / IP协议(默认)来与BrokerServer进行连接,然后生产者会把交换/队列的绑定信息发送到代理服务器,代理服务器根据交换的类型逻辑选择对应队列,最后把信息发送到与队列关联的对应的消费者。


2.2交换器(交换),队列(队列),信道(频道),绑定(绑定)的概念

2.2.1交换器交换

制片人建立连接后,并非直接将消息投递到队列队列中,而是把消息发送到交换器交易所,由交易所根据不同逻辑把消息发送到一个或多个对应的队列当中。目前交换提供了四种不同的常用类型:扇出,直接,话题,头。
  • 扇出类型
此类型是最为常见的交换器,它会将消息转发给所有与之绑定的队列上,比如,有N个队列与扇出交换器绑定,当产生一条消息时,交易所会将该消息的N个副本分别发给每个队列,类似于广播机制。
  • 直接类型
此类型的交换会把消息发送到Routing_Key完全相等的队列当中。多个Cousumer可以使用相同的关键字进行绑定,类似于数据库的一对多关系。比如,生产者以直接类型的交换推送Routing_Key为直接。 key1的队列,系统再指定多个Cousumer绑定直接。 key1。如此,消息就会被分发至多个不同的Cousumer当中。
  • 主题类型
此类型是最灵活的一种方式配置方式,它可以使用模糊匹配,根据Routing_Key绑定到包含该关键字的不同队列中,比如,生产商使用主题类型的交换分别推送Routing_Key设置为topic.guangdong。 广州,topic.guangdong。 深圳的不同队列,Cousumer只需要把Routing_Key设置为topic.guangdong。 #,就可以把所有消息接收处理。
  • 头类型
该类型的交换器与前面介绍的稍有不同,它不再是基于关键字Routing_Key进行路由,而是基于多个属性进行路由的,这些属性比路由关键字更容易表示为消息的头。也就是说,用于路由的属性是取自于消息头属性,当消息头的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。

2.2.2队列队列

队列队列是消息的载体,每个消息都会被投入到队列当中,它包含名称、耐用、参数等多个属性,名字用于定义它的名称,当持久(持久化)为真时,队列将会持久化保存到硬盘上。反之为假时,一旦代理服务器被重启,对应的队列就会消失,后面还会有例子作详细介绍。

2.2.3通道通道

当代理服务器使用连接连接生产者/ Cousumer时会使用到信道(频道),一个连接上可以建立多个通道,每个频道都有一个会话任务,可以理解为逻辑上的连接。主要用作管理相关的参数定义,发送消息,获取消息,事务处理等。

2.2.4绑定绑定

绑定的主要用于绑定交换器交换与队列队列之间的对应关系,并记录路由的Routing-Key.Binding信息会保存到系统当中,用于代理服务器信息的分发依据。

三,RabbitMQ应用实例

3.1兔常用类说明

3.1.1 RabbitTemplate类

春天框架已经封装了RabbitTemplate对RabbitMQ的绑定,队列发送、接收进行简化管理

3.2初探RabbitMQ

在官网下载并成功安装完RabbitMQ后,打开默认路径http://localhost: 15672 / # /即可看到RabbitMQ服务中心的管理界面

3.2.1生产者端开发

先在pom中添加RabbitMQ的依赖,并在应用程序。 yml中加入RabbitMQ帐号密码等信息。此例子,我们尝试使用直接交换器把队列发送到不同的消费者。
**********************pom *************************
<project>
        .............
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.0.5.RELEASE</version>
    </dependency>
</project>

****************  application.yml  ****************
spring:
  application:
     name: rabbitMqProducer
  rabbitmq:
    host: localhost 
    port: 5672
    username: admin
    password: 12345678
    virtual-host: /LeslieHost复制代码
首先使用CachingConnectionFactory建立链接,通过BindingBuilder绑定交换,队列,RoutingKey之间的关系。
然后通过空虚convertAndSend(字符串,字符串routingKey、对象对象CorrelationData数据)方法把信息发送到破碎的服务器
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME,true,true);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
    
    @RequestMapping("/send")
    public void send() {
        for(int n=0;n<100;n++){   

            template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue!   "+String.valueOf(n),getCorrelationData());
            template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue!  "+String.valueOf(n),getCorrelationData());
        }
    }

     private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
此时,打开RabbitMQ管理界面,可看到生产商已经向破碎的服务器的直接。 第一/直接。 第二个两个队列分别发送100个消息

3.2.2消费者端开发

分别建立两个不同的消费者,一个绑定直接。 先别一个绑定直接。 第二,然后通过注解@RabbitListener监听不同的队列,当接到到制片人推送队列时,显示队列信息。
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }  
}

@Configuration
@RabbitListener(queues="direct.first")
public class RabbitMqListener {
    
    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

@SpringBootApplication
public class App {
    
    public static void main(String[] args){
        SpringApplication.run(App.class, args);
    }
}复制代码
运行后可以观察到不同的消费者会收到不同队列的消息

如果觉得使用绑定代码绑定过于繁琐,还可以直接在监听类RabbitMqListener中使用@QueueBinding注解绑定
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.second"),
key="directKey2"))
public class RabbitMqListener {
    
    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

@SpringBootApplication
public class App {
    
    public static void main(String[] args){
        SpringApplication.run(App.class, args);
    }
}复制代码
运行结果

四、制片人端的消息发送与监

前面一节已经介绍了RabbitMQ的基本使用方法,这一节将从更深入的层面讲述生产商的应用。 试想一下这种的情形,如果因RabbitTemplate发送时交换名称绑定错误,或打破服务器因网络问题或服务负荷过大引发异常,制片人发送的队列丢失,系统无法正常工作。此时,开发人员应该进行一系列应对措施进行监测,确保每个数据都能正常推送到破碎的服务器。有见及此,RabbitMQ专门为大家提供了两种解决方案,一是使用传统的事务模式,二是使用回调函数、下面为大家作详介绍。

4.1生产端的事务管理

在需要使用事务时,可以通过两种方法 第一可以调用通道类的方法以传统模式进行管理、事务开始时调用通道。 txSelect()、信息发送后进行确认通道。 txCommit(),一旦捕捉到异常进行回滚channel.txRollback(),最后关闭事务。
@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
 
    @RequestMapping("/send")
    public void send1(HttpServletResponse response) 
        throws InterruptedException, IOException,  TimeoutException{
        Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
        .......
        try{
            channel.txSelect();
            channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
            channel.txCommit();
        }catch(Exception e){
            channel.txRollback();
        }finally{
            channel.close();
        }
        ......
        ......
        ......
    }
}复制代码
第二还可以直接通过RabbitTemplate的配置方法无效setChannelTransacted (bool isTransacted)直接开启事务
public class ProducerController {
    @Autowired
    private ConnectionConfig connection;

    @Autowired
    @Bean
    private RabbitTemplate template(){
        RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
        template.setChannelTransacted(true);
        return template;
    }
 
    @RequestMapping("/send")
    @Transactional(rollbackFor=Exception.class)
    public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
        ..........
        ..........
        ..........
    }
}复制代码

4.2利用ConfirmCallback回调确认消息是否成功发送到交易所

使用事务模式消耗的系统资源比较大,系统往往会处理长期等待的状态,在并发量较高的时候也有可能造成死锁的隐患。有见及此,系统提供了轻量级的回调函数方式进行异步处理。 当需要确认消息是否成功发送到交换的时候,可以使用ConfirmCallback回调函数。使用该函数,系统推送消息后,该线程便会得到释放,等交换接收到消息后系统便会异步调用ConfirmCallback绑定的方法进行处理.ConfirmCallback只包含一个方法无效确认(CorrelationData CorrelationData,布尔ack,字符串原因),此方法会把每条数据发送到交换时候的ack状态(成功/失败),导致成败原因,及对应的CorrelationData (CorrelationData只包含一个属性id,是绑定发送对象的唯一标识符)返还到生产商,让生产者进行相应处理。
注意:在绑定ConfirmCallback回调函数前,请先把publisher-confirms属性设置为真实的
spring:
  application:
     name: rabbitmqproducer
  rabbitmq:
    host: 127.0.0.1 
    port: 5672
    username: admin
    password: 12345678
    virtual-host: /LeslieHost复制代码
例如:下面的例子,特意将RabbitTemplate发送时所绑定的交换名称填写为错误名称“ErrorExchange”,造成发送失败,然后在回调函数中检查失败的原因。
生产者端代码:
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        System.out.println(host);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        factory.setPublisherConfirms(true);
        factory.setPublisherReturns(true);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }  
}

@Component
public class MyConfirmCallback implements ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO 自动生成的方法存根
        // TODO 自动生成的方法存根
        if(ack){
            System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
        }else
            System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private MyConfirmCallback confirmCallback;

    @RequestMapping("/send")
    public void send() {
        template.setConfirmCallback(confirmCallback);       
        for(int n=0;n<2;n++){   
            template.convertAndSend("ErrorExchange",
                     BindingConfig.RoutingKey1,"I'm the first queue!   "
                     +String.valueOf(n),getCorrelationData());
        }
    }

     private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
    
    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

@SpringBootApplication
public class App {
    
    public static void main(String[] args){
        SpringApplication.run(App.class, args);
    }
}复制代码
运行结果:

4.3绑定CorrelationData与发送对象的关系

上面的例子当中,CorrelationData只是用一个随机的UUID作为CorrelationID,而在现实的应用场景中,由于ConfirmCallback只反回标识值CorrelationData,而没有把队列里的对象值也一同返回,所以,在推送队列时可以先用键-值保存CorrelationID与所发送信息的关系,这样当ConfirmCallback回调时,就可根据CorrelationID找回对象,作进一步处理。
下面例子,我们把要发送的对象放在虚拟数据数据源类中,用DataRelation记录CorrelationID与发送对象OrderID的关系,然后在回调函数ConfirmCallback中根据CorrelationID查找对应的OrderEntity,如果发送成功,则删除绑定。如果发送失败,可以重新发送或根据情况再作处理。
生产者端代码:
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        System.out.println(host);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        factory.setPublisherConfirms(true);
        factory.setPublisherReturns(true);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    //Exchange 使用 direct 模式     
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
}

@Data
public class OrderEntity implements Serializable{
    private String id;
    private String goods;
    private Double price;
    private Integer count;
    
    public OrderEntity(String id,String goods,Double price,Integer count){
        this.id=id;
        this.goods=goods;
        this.price=price;
        this.count=count;
    }
    
    public OrderEntity(){}
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoodsId(String goods) {
        this.goods = goods;
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }
}

@Component
public class DataSource {
    //加入虚拟数据
    private static List<OrderEntity> list=new ArrayList<OrderEntity>(
            Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
                          new OrderEntity("002","Huwei P30 Plus",5400.00,1),
                          ..........));
    
    public DataSource(){
    }
    
    public List<OrderEntity> getOrderList(){
        return list;
    }
    
    //根据Id获取对应order
    public OrderEntity getOrder(String id){
        for(OrderEntity order:list){
            if(order.getId()==id)
                return order;
        }
        return null;
    }
}

public class DataRelation {
    public static Map map=new HashMap();
    
    //绑定关系
    public static void add(String key,String value){
        if(!map.containsKey(key))
            map.put(key,value);
    }
    
    //返回orderId
    public static Object get(String key){
        if(map.containsKey(key))
            return map.get(key);
        else
            return null;
    }
    
    //根据 orderId 删除绑定关系
    public static void del(String key){
        if(map.containsKey(key))
           map.remove(key);
    }
}

@Component
public class MyConfirmCallback implements ConfirmCallback {
    @Autowired
    private DataSource datasource;
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String correlationId=correlationData.getId();
        //根据 correclationId取回对应的orderId
        String orderId=DataRelation.get(correlationId).toString();
        //在datasource中找回对应的order
        OrderEntity order=datasource.getOrder(orderId);
        
        if(ack){
            System.out.println("--------------------ConfirmCallback-------------------\n"                 
                +" order's ack is true!\nId:"+order.getId()+"  Goods:"+order.getGoods()
                +" Count:"+order.getCount().toString()+"  Price:"+order.getPrice());
            DataRelation.del(correlationId);    //操作完成删除对应绑定
        }else {
            System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
            //可在记录日志后把Order推送到队列进行重新发送
            .......
        }
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private MyConfirmCallback confirmCallback;
    @Autowired
    private DataSource dataSource;

    @RequestMapping("/send")
    public void send() throws InterruptedException, IOException{
        //绑定 ConfirmCallback 回调函数
        template.setConfirmCallback(confirmCallback);
 
        for(OrderEntity order:dataSource.getOrderList()){
            CorrelationData correlationData=getCorrelationData();
            //保存 CorrelationId 与 orderId关系
            DataRelation.add(correlationData.getId(), order.getId());
            //把 order 插入队列
            template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
        }
    }
    
    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
    
    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

@SpringBootApplication
public class App {
    
    public static void main(String[] args){
        SpringApplication.run(App.class, args);
    }
}复制代码
运行结果

4.4利用ReturnCallback处理队列队列错误

使用ConfirmCallback函数只能判断消息是否成功发送到交易所,但并不能保证消息已经成功进行队列队列。所以,系统预备了另一个回调函数ReturnCallback来监听队列队列处理的成败。如果队列错误绑定不存在的队列,或者打破服务器瞬间出现问题末能找到对应的队列,系统就会激发生产者端ReturnCallback的回调函数来进行错误处理。ReturnCallback回调接口只包含一个方法无效returnedMessage (replyText消息消息、int replyCode字符串,字符串,字符串routingKey),它会把出错的replyCode, replyText,交换,routingKey等值都一起返还。与ConfirmCallback不同的是,returnedMessage会把队列中的对象保存到消息的正文属性中并返还到回调函数。
注意:在绑定ReturnCallback回调函数前,请先把publisher-returns及强制性属性设置为真的。强制参数默认为假,用于判断了服务器是否把错误的对象返还到制片人。如末进行设置,系统将把错误的消息丢弃。
下面例子我们在调用convertAndSend方法时特意把routingKey设置为ErrorKey,触发ReturnCallback回调,然后在ReturenCallback的回调方法显示replyCode, replyText,交换,routingKey等值,并把队列中对象属性一并显示。
生产者端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        System.out.println(host);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        factory.setPublisherConfirms(true);
        factory.setPublisherReturns(true);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    } 
}

@Data
public class OrderEntity implements Serializable{
    private String id;
    private String goods;
    private Double price;
    private Integer count;
    
    public OrderEntity(String id,String goods,Double price,Integer count){
        this.id=id;
        this.goods=goods;
        this.price=price;
        this.count=count;
    }
    
    public OrderEntity(){}
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoodsId(String goods) {
        this.goods = goods;
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }
}

@Component
public class DataSource {
    //虚拟数据
    private static List<OrderEntity> list=new ArrayList<OrderEntity>(
            Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
                          new OrderEntity("002","Huwei P30 Plus",5400.00,1),
                          ......));
    public DataSource(){
    }
    
    public List<OrderEntity> getOrderList(){
        return list;
    }
    
    //根据Id获取对应order
    public OrderEntity getOrder(String id){
        for(OrderEntity order:list){
            if(order.getId()==id)
                return order;
        }
        return null;
    }
}

@Component
public class MyReturnCallback implements ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, 
            String replyText, String exchange, String routingKey){
        //把messageBody反序列化为 OrderEntity对象
        OrderEntity order=convertToOrder(message.getBody());
        //显示错误原因
        System.out.println("-------------ReturnCallback!------------\n"
            +" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
            +" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
            +"  Goods:"+order.getGoods()+"  Count:"+order.getCount().toString()
            +"  Price:"+order.getPrice()+" ");
    }
    
    //把byte[]反序列化为 OrderEntity对象
    private OrderEntity convertToOrder(byte[] bytes){
        OrderEntity order=null;
        ByteArrayInputStream bis = new ByteArrayInputStream (bytes);        
        ObjectInputStream ois;
        try {
            ois = new ObjectInputStream (bis);
            Object obj = ois.readObject();
            order=(OrderEntity)obj;
            ois.close();   
            bis.close(); 
        } catch (IOException | ClassNotFoundException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }        
        return order;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private MyReturnCallback returnCallback;
    @Autowired
    private DataSource dataSource;
 
    
    @RequestMapping("/send")
    public void send() throws InterruptedException, IOException{
        //把 mandatory 属性设定为true
        template.setMandatory(true);
        //绑定 ReturnCallback 回调函数
        template.setReturnCallback(returnCallback);
 
        for(OrderEntity order:dataSource.getOrderList()){
            CorrelationData correlationData=getCorrelationData();
            template.convertAndSend("directExchange","ErrorKey",order,correlationData);
        }
    }
    
    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
消费者代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
    
    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

@SpringBootApplication
public class App {
    
    public static void main(String[] args){
        SpringApplication.run(App.class, args);
    }
}复制代码
运行结果:

五、消费者消息接收管控

在第四节主要介绍了生产者端的队列发送与监控,它只能管理生产商与代理服务器之间的通信,但并不能确认消费者是否能成功接收到队列,在这节内容将介绍消费者端的队列接收与监听。前面几节里,消费者端都是简单地直接使用RabbitListener对队列进行监听,其实RabbitMQ已经为用户准备了功能更强大的MessageListenerContainer容器用于管理消息,下面将为大家介绍。

5.1 AbstractMessageListenerContainer介绍

AbstractMeessageListenerContainer虚拟类是RabbitMQ封装好的一个容器,本身并没有对消息进行处理,而是把消息的处理方式交给了MessageListener。而它的主要功能是实现MessageListener的绑定,ApplicationContext上下文的绑定,ErrorHandler的错误处理方法的绑定,对消息消费的开始,结束等等默认参数进行配置,让开发人员可以在容器中对消费者实现统一管理.SimpleMessageListenerContainer, DirectMessageLinstenerCoontainer都是它的子类,分别应用于不同的场景,在下面会再作详细介绍。

MessageListener是监听消息最常用的倾听者,它只包含了一个方法无效onMessage(消息消息),这是消息接收最常用的一个方法,开发者只需要实现此方法即可对接收到的消息进行处理。
ChannelAwareMessageListener相当于是MessageListener的一个扩展,包含了方法无效onMessage(消息消息,频道频道),除了对消息进行处理外,还可以对接收此消息的通道进行检测。

5.2 SimpleMessageListenerContainer常用方法

SimpleMessageListenerContainer是最常用的MessageListener容器,它可以通过下面的方法设置默认消费者数量与最大的消费者数量。下面例子中尝试把consurrentConsumers设置为3,把maxConcurrentConsumers设置为4,并同时监控直接模式交换器的direct.first,直接。 第二队列。

通过截图可以看的到,系统默认会为每个队列都创建3个消费者,不同的队列中的消费者是共享相同的3个频道。




当生产者端发送消息时,消费者的实际数量可根据maxConcurrentConsumers的配置限制进行扩展。
生产者端代码
@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
 
    @RequestMapping("/send")
    public void send(HttpServletResponse response) throws InterruptedException, IOException{
        for(Integer n=0;n<100;n++){
                CorrelationData correlationData=getCorrelationData();
                template.convertAndSend("directExchange","directKey1", 
                         "queue1"+"  "+n.toString(),correlationData);
                template.convertAndSend("directExchange","directKey2"," queue2"+"  "+n.toString(),correlationData);            
                Thread.currentThread().sleep(30);
        }
    }

    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端代码:
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}
@Configuration
public class SimpleMessListener {
    @Autowired
    private RabbitTemplate template;
    private int index=0;

    @Bean
    public SimpleMessageListenerContainer messageContainer(){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionConfig.getConnectionFactory());
        // 绑定Queue1/Queue2
        container.setQueueNames("direct.first");      
        container.addQueueNames("direct.second");
        //设置默认 consumer 数为3
        container.setConcurrentConsumers(3);
        //设置最大 consumer 数为4
        container.setMaxConcurrentConsumers(4);
        //标记 consumerTag
        container.setConsumerTagStrategy(queue ->  "consumer"+(++index));
        //绑定MessageListener显示接收信息
        container.setMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message) {
                // TODO 自动生成的方法存根
                Thread thread=Thread.currentThread();
                MessageProperties messProp=message.getMessageProperties();
                try {
                    System.out.println("  ConsumerTag:"+messProp.getConsumerTag()
                            +"  ThreadId is:"+thread.getId()+"  Queue:"+messProp.getConsumerQueue()
                            +"  "+new String(message.getBody(),"UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    // TODO 自动生成的 catch 块
                    e.printStackTrace();
                }
            }
            
        });
        return container;
    }
}复制代码
运行结果

5.3 SimpleMessageListenerContainer的运作原理

在SimpleMessageListenerContainer模式中,无论系统监听多少个队列队列,通道都是共享的,类似上面的例子,4个频道会把接收到不同的队列请求并分发到对应的消费者进行处理。这样做的好处是系统可以通过concurrentConsumers, maxConcurrentConsumers灵活设定当前队列中消费者的数量,系统可以跟据实际需求灵活处理。但由于每个通道都是在固定线程中运行的,一个频道要游走于多个消费者当中,这无疑增加了系统在上下文切换中的开销。下面用系统提供的ChannelAwareMessageListener接口,以更直观的例子说明一下SimpleMessageListenerContainer当中,队列,消费者之间的关系。
生产者端代码
@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
 
    @RequestMapping("/send")
    public void send(HttpServletResponse response) throws InterruptedException, IOException{
        for(Integer n=0;n<100;n++){
                CorrelationData correlationData=getCorrelationData();
                template.convertAndSend("directExchange","directKey1",
                             " queue1"+"  "+n.toString(),correlationData);
                template.convertAndSend("directExchange","directKey2",
                             "queue2"+"  "+n.toString(),correlationData);            
                Thread.currentThread().sleep(30);
        }
    }

    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}
@Configuration
public class SimpleMessListener {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private ConnectionConfig connectionConfig;
    private int index=0;

    @Bean
    public SimpleMessageListenerContainer messageContainer(){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionConfig.getConnectionFactory());
        // 绑定Queue1/Queue2
        container.setQueueNames("direct.first");      
        container.addQueueNames("direct.second");
        //设置默认 consumer 数为3
        container.setConcurrentConsumers(3);
        //设置最大 consumer 数为4
        container.setMaxConcurrentConsumers(4);
        //标记 consumerTag
        container.setConsumerTagStrategy(queue ->  "consumer"+(++index));
        //绑定ChannelAwareMessageListener显示接收信息
        container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
                    throws Exception {
                // TODO 自动生成的方法存根
                // TODO 自动生成的方法存根
                Thread thread=Thread.currentThread();
                System.out.println("Channel:"+channel.getChannelNumber()                            
                        +"  ThreadId is:"+thread.getId()
                        +"  ConsumerTag:"+message.getMessageProperties().getConsumerTag()
                        +"  Queue:"+message.getMessageProperties().getConsumerQueue());
                        
            }
            
        });
        return container;
    }
}复制代码
运行结果:

观察运行结果可以看到:每个频道都在固定的线程中运行,一个频道会向不同的消费者发送队列信息了。解频道,线程,队列,消费者之间的关系,会对SimpleMessageListenerContainer有更深入认识。

5.4 DirectMessageListenerContainer

SimpleMessageListenerContainer是经典的容器,使用通道共享,一旦某个频道关闭或重启,意味着每个队列队列中使用当通道前的消费者都会受到影响。有见及此,在RabbitMQ 2.0后,系统引入了DirectMessageListenerContainer,它允许每个消费者都有各自的对应的通道的通道只管理负责管理当前消费者的通道。这样令消费者运用更灵活,同时线程并没有跟频道绑定,而是由独立的线程池进行管理,这是更好地解决了SimpleMessageListenerContainer中上下文切换所带来的资源消耗问题。
下面的例子,我们尝试使用把consumersPerQueue设置为4,并同时监控直接模式交换的direct.first,直接。 第二队列。

从管理界面可以看的到,系统会为每个消费者都生成一个独立的通道进行管理。

生产者端代码
@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
 
    @RequestMapping("/send")
    public void send(HttpServletResponse response) throws InterruptedException, IOException{
        for(Integer n=0;n<100;n++){
                CorrelationData correlationData=getCorrelationData();
                template.convertAndSend("directExchange","directKey1",
                             " queue1"+"  "+n.toString(),correlationData);
                template.convertAndSend("directExchange","directKey2",
                             "queue2"+"  "+n.toString(),correlationData);            
                Thread.currentThread().sleep(30);
        }
    }

    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class DirectMessListener {
    @Autowired
    private ConnectionConfig connectionConfig;
    @Autowired
    private RabbitTemplate template;
    private int index=0;
    
    @Bean
    public DirectMessageListenerContainer messageContainer(){
        DirectMessageListenerContainer container=new DirectMessageListenerContainer();
        container.setConnectionFactory(connectionConfig.getConnectionFactory());
        // 设置每个队列的 consumer 数量
        container.setConsumersPerQueue(4);
        container.addQueueNames("direct.first");
        container.addQueueNames("direct.second");
        container.setConsumerTagStrategy(queue -> "consumer"+(++index));
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
                    throws Exception {
                // TODO 自动生成的方法存根
                // TODO 自动生成的方法存根
                Thread thread=Thread.currentThread();
                
                System.out.println("Channel:"+channel.getChannelNumber()                            
                        +"  ThreadId is:"+thread.getId()
                        +"  ConsumerTag:"+message.getMessageProperties().getConsumerTag()
                        +"  Queue:"+message.getMessageProperties().getConsumerQueue());
              }    
        });
        return container;
    }
}复制代码
通过运行结果进一步可以证实,消费者信息接收是由独立的线程池进行管理的,并没有与通道绑定,每个消费者都有自己单独的通道,即使频道发生问题时,也不会对其他的消费者发生影响,这正是DirectMessageListenerContainer的优胜之处。

5.5消费者的信息接收确认方式

在第四节曾经介绍过在生产者端利用ConfirmCallback / ReturnCallback监控信息的发送,在这节将为大家在消费者端监控信息的接收。
消费者的信息接收确认模式可以通过AcknowledgeMode设定,一共有三种模式:没有,手动,自动,默认是汽车模式。其中没有为系统确认,手动是手动确认。
而汽车为自动模式,系统可以根据执行情况自动发送ack /纳。如果方法未抛出异常,则发送ack。如果抛出异常AmqpRejectAndDontRequeueException顾名思义消息被拒绝且不会重新加入队列。如果方法抛出非AmqpRejectAndDontRequeueException异常,则系统发送纳消息重归队列。
渠道消息接收的常用方法

AcknowledgeMode配置为手动后,用户可通过通道类的空白basicAck(长deliveryTag,布尔多个)方法手动确认消息接收是否成功。
若检测到有异,常可通过空虚basicReject(长deliveryTag,布尔requeue)或无效basicNack(长deliveryTag,布尔多,布尔requeue)确认是否重新把消息推送。
通过配置prefetchCount可设置消费者每次接收到的信息数量,系统默认值为250,这表示当消费者队列接收到250请求其状态皆为unacked时,代理服务器将暂停向消费者发送消息,待消息处理后再继续。
下面例子中我们尝试把prefetchCount设置为10,即每个消费者单次最多接收到的消息为10条,并把consumersPerQueue设置为4,然后把AcknowledgeMode设置为手动,通过手动确认消息接收,一旦发生错误,消息重新加入队列。
生产者端代码
@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
 
    @RequestMapping("/send")
    public void send(HttpServletResponse response) throws InterruptedException, IOException{
        for(Integer n=0;n<100;n++){
                CorrelationData correlationData=getCorrelationData();
                template.convertAndSend("directExchange","directKey1",
                             " queue1"+"  "+n.toString(),correlationData);
                template.convertAndSend("directExchange","directKey2",
                             "queue2"+"  "+n.toString(),correlationData);            
        }
    }

    private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
运行后可看到代理服务器每条队列会有100条数据处于待处理状态

客户端代码
@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class BindingConfig {
    public final static String first="direct.first";
    public final static String second="direct.second";
    public final static String Exchange_NAME="directExchange";
    public final static String RoutingKey1="directKey1";
    public final static String RoutingKey2="directKey2";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(first);
    }
    
    @Bean
    public Queue queueSecond(){
        return new Queue(second);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(Exchange_NAME);
    }
    
    //利用BindingBuilder绑定Direct与queueFirst
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
    }
    
    //利用BindingBuilder绑定Direct与queueSecond
    @Bean
    public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
        return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
    }   
}

@Configuration
public class DirectMessListener {
    @Autowired
    private ConnectionConfig connectionConfig;
    @Autowired
    private RabbitTemplate template;
    private int index=0;
    
    @Bean
    public DirectMessageListenerContainer messageContainer(){
        DirectMessageListenerContainer container=new DirectMessageListenerContainer();
        container.setConnectionFactory(connectionConfig.getConnectionFactory());
        // 设置每个队列的 consumer 数量
        container.setConsumersPerQueue(4);
        // 设置每个 consumer 每次的接收的消息数量为10个
        container.setPrefetchCount(10);
        // 使用MANUAL进行手动确认      
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.addQueueNames("direct.first");
        container.addQueueNames("direct.second");
        container.setConsumerTagStrategy(queue -> "consumer"+(++index));
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
                    throws Exception {
                Thread thread=Thread.currentThread();
                MessageProperties prop=message.getMessageProperties();
                try{
                    System.out.println("Channel:"+channel.getChannelNumber()                            
                            +"  ThreadId is:"+thread.getId()
                            +"  ConsumerTag:"+prop.getConsumerTag()
                            +"  Queue:"+prop.getConsumerQueue());
                    //通过Tag单个确认
                    channel.basicAck(prop.getDeliveryTag(), false);
                }catch(Exception ex){
                    //判定单个接收失败,重新加入consumer队列
                    channel.basicReject(prop.getDeliveryTag(), true);
                }
                thread.sleep(1000);
            }    
        });
        return container;
    }
}复制代码
观察信息接收情况,每个消费者一次可处理10条信息,对队列进行分批处理。

六、死信队列

死信队列(Dead-Letter-Exchange)可被看作是死信交换器。当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:
  • 消息被拒绝,requeue被设置为假,可通过上一介绍的空白basicReject (deliveryTag requeue)或无效basicNack (deliveryTag,多个requeue)完成设置;
  • 消息过期;
  • 队列超出最大长度。
其实死信队列DLX也是一个正常的交换器,和一般的交换器没有什么区别,我们可以用一般建立队列的方法,建立一个死信队列。然后建立一个正常的队列,在正常队列中加入参数x-dead-letter-exchange, x-dead-letter-routing-key与死信队列进行绑定,完成绑定后在管理界面特征选项中direct.queue。 第一次会显示DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为假时,对会激发死信),信息就会流入死信队列被处理。
具体的例子生产者端:
@Configuration 
public class BindingConfig {
    public final static String Queue_First="direct.queue.first";
    public final static String Exchange_Name="directExchange";
    public final static String Routing_Key_First="directKey1";
    
    @Bean
    public Queue queueFirst(){
        return new Queue(this.Queue_First);
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(this.Exchange_Name);
    }
    
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
    }
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        System.out.println(host);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Controller
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate template;
    
    @RequestMapping("/send")
    public void send() {
        for(int n=0;n<10;n++){            
            template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World!   "
                   +String.valueOf(n),getCorrelationData());
        }
    }

     private CorrelationData getCorrelationData(){
        return new CorrelationData(UUID.randomUUID().toString());
    }
}复制代码
客户端
@Configuration
public class BindingConfig {
    //普通队列参数
    public final static String Queue_First="direct.queue.first";
    public final static String Exchange_Name="directExchange";
    public final static String Routing_Key_First="directKey1";
    //死信队列参数
    public final static String Queue_Dead="direct.queue.dead";
    public final static String Exchange_Dead="directDead";
    public final static String Routing_Key_Dead="directDeadKey";
    
    @Bean
    public Queue queueFirst(){
        Map<String, Object> args=new HashMap<String,Object>();
        //声明当前死信的 Exchange
        args.put("x-dead-letter-exchange", this.Exchange_Dead);
         //声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
        //把死信队列的参数绑定到当前队列中
        return QueueBuilder.durable(Queue_First).withArguments(args).build();
    }
    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(this.Exchange_Name);
    }
    
    @Bean
    public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
        return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
    }
    
    @Bean
    public Queue queueDead(){
        return new Queue(this.Queue_Dead);
    }
    
    @Bean
    public DirectExchange directExchangeDead(){
        return new DirectExchange(this.Exchange_Dead);
    }
    
    @Bean
    public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
        return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
    }
}

@Configuration
public class ConnectionConfig {
    @Value("${spring.rabbitmq.host}")
    public String host;
    
    @Value("${spring.rabbitmq.port}")
    public int port;
    
    @Value("${spring.rabbitmq.username}")
    public String username;
    
    @Value("${spring.rabbitmq.password}")
    public String password;
    
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;

    @Bean
    public ConnectionFactory getConnectionFactory(){
        CachingConnectionFactory factory=new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        return factory;
    }
}

@Configuration
public class DirectMessListener {
    @Autowired
    private ConnectionConfig connectionConfig;
    @Autowired
    private RabbitTemplate template;
    private int index=0,normalIndex=0,deadIndex=0;    
    
    @Bean
    public DirectMessageListenerContainer messageContainer(){
        DirectMessageListenerContainer container=new DirectMessageListenerContainer();
        container.setConnectionFactory(connectionConfig.getConnectionFactory());
        // 设置每个队列的 consumer 数量
        container.setConsumersPerQueue(4);
        // 设置每个 consumer 每次的接收的消息数量
        container.setPrefetchCount(10);
        // 使用MANUAL手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 监听队列
        container.addQueueNames(BindingConfig.Queue_First);
        container.addQueueNames(BindingConfig.Queue_Dead);
        container.setConsumerTagStrategy(queue -> "consumer"+(++index));
         
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
                    throws Exception {
                MessageProperties prop=message.getMessageProperties();
                if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
                    System.out.println("This is a normal queue!  "+(++normalIndex));
                    //把当前的队列转送到死信队列中
                    channel.basicReject(prop.getDeliveryTag(), false);                
                }
                if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
                    System.out.println("This is a dead queue! "+(++deadIndex));
                    //模拟对死信队列处理
                    Thread.currentThread().sleep(5000);
                    .......
                    //处理完毕
                    channel.basicAck(prop.getDeliveryTag(), false);
                }
                
            }
        });
        return container;
    }
}复制代码
通过管理界面可以看,信息会先发送到direct.queue.first,然后被放进死信队列作处理。


运行结果

死信队列最常用的场景可以在订单支付,流程审批等环节,例如在京,淘等平台,当下单成功后,客户要在一定的时间内完成支付操作,否则订单被视作无效,这些业务流程就可以使用死信队列来处理。

七、持久化操作

RabbitMq的持久化操作包含有队列持久化,消息持久化和交换持久化三类。

7.1队列的持久化

队列持久化只需要在队列的构造函数公共队列(字符串名称,布尔耐用)把耐用参数置为正确就可实现。如果队列不设置持久化((耐用默认为false),那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

7.2消息持久化

设置了队列持久化以后,当RabbitMQ服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用。
在RabbitMQ原生态的框架下,需要把信息属性设置为MessageProperties。 持续的纯文本才会实现消息的持久化。
而在春天框架下,由于在使用回调函数时需要把消息重新返回队列再进行处理,所以消息默认已经是持久化的。

7.3交流的持久化

交换器持久化可通过构造函数公共DirectExchange(字符串名称,布尔耐用,布尔autoDelete)把耐用参数置为正确就可实现,而autoDelete则是指在所在消费者都解除订阅的情况下自动删除。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息不再发送到该交易所。对一个长期使用的交换器来说,持久化还是有其必要性的。

本章总结

RabbitMQ发展至今,被越来越多的人认可,这和它在易用性,扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。
相比于传统的ActiveMQ和分布式卡夫卡,它具有自己独有的特点。
希望文章有帮于大家对RabbitMQ消息队列方面有更深入的了解,在不同的开发环境中灵活运用。
由于时间仓促,文章当中有不明确的地方或有错漏敬请点明。
共同探讨!


对JAVA开发有兴趣的朋友欢迎加入QQ群: 833145934 里面资深架构师会分享一些整理好的录制视频录像和BATJ面试题:有春天,MyBatis,网状的源码分析,高并发,高性能,分布式、微服务架构的原理,JVM性能优化,分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。

共同探讨!



关注下面的标签,发现更多相似文章
评论