RbbitMQ(五) -- 100%消息投递消费

777 阅读9分钟

一:前言概述

生产者生产消息到消费者消息消费,中间需要生产者将消息发送到交换器,再由交换器路由到队列存储,然后消费者进行消息消费。在没有任何设置情况下,中间可能存在以下几种情况导致消息丢失:

在这里插入图片描述

  1. 消费者将消息发送到交换器因为RabbitMQ内部原因丢失消息
  2. 交换器将消息路由到队列,因为队列不存在等因素导致消息丢失
  3. 队列中存储的消息在消费者未消费时RabbitMQ服务宕机导致消息丢失
  4. 消费者消费消息时消费者宕机未处理完消息导致消息丢失

针对上述情况,本文将根据每个节点讲述如何操作确保消息投递的可靠性,同时在保障可靠性的情况下可能会引发系列如消息重复等问题,也是本文将会涉及到的重点

二:事务TX

理解数据库的事务就是将多次操作原子化,统一提交回滚实现数据一致性。RabbitMQ事务可能与之前数据库等事务有一定差别,当然也是支持消息的提交与回滚操作。事务是解决生产者发送消息到交换器消息丢失问题的方案之一

2.1 相关操作

RabbitMQ中的事务实现有如下两个个主要步骤:

在这里插入图片描述

  1. 将通信的信道设置为事务模式
  2. 事务提交/事务回滚
2.2 代码示例
        // 将信道设置为事务模式
        channel.txSelect();
        // 发送消息
        try {
            channel.basicPublish(bindingKey, bindingKey, false, false, null, messgae.getBytes("UTF-8"));
            // 提交事务
            channel.txCommit();
        }catch (Exception e){
            // 异常回滚事务,发生异常的时候可以尝试重发或者记录等操作
            channel.txRollback();
        }
2.3 总结

事务机制的确认需要等待上一条消息发送完毕反馈之后才能进行第二条消息发送,这样的操作将会对于使用RabbitMQ而言感觉就是暴殄天物。如果是想要发送多条消息只能循环操作,但是注意如果没有将channel信道设置为事务模式不能进行事务操作,不然会抛出异常。最后一点就是信道设置为事务模式只需要操作一次即可

三:确认Confirm

事务机制对于RabbitMQ性能消耗是灾难性的,针对生产者到交换器消息丢失处理提出了全新的轻量级处理方式。发送发确认机制confirm,该操作编码上会有很多地方都在讲什么等待确认、批量确认、异步确认。一切为了生产,所以本文将只会介绍生产用的异步确认方式

3.1 相关操作

RabbitMQ的生产发送确认实现由以下三个部分构成:

  • 将信道channel设置为确认模式
  • 增加确认监听Listener
  • 处理监听结果
3.2 代码示例
  • 每个通道发送到RabbitMQ的Broker中都会有唯一的编码
  • 生产端最好使用有序队列存储发送的消息,方便确认后的删除
  • 创建Channel通道时可以指定唯一编码,标识该通道
        // 设置confirm消息发送确认机制
        channel.confirmSelect();
        // 增加确认机制监听器
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 成功确认
             * deliveryTag 表示消息的唯一标识
             * multiple 本次确认是否为批量操作
             */
            @Override
            public void handleAck (long deliveryTag, boolean multiple) throws IOException {
                // 删除有序集合中的消息
                if (! multiple){
                    // 根据坐标删除消息
                }else {
                    // 批量删除消息
                }
            }
    
            /**
             * 失败确认
             * deliveryTag 表示消息的唯一标识
             * multiple 本次确认是否为批量操作
             */
            @Override
            public void handleNack (long deliveryTag, boolean multiple) throws IOException {
                // 可以根据deliveryTag做重试操作等
            }
        });
3.3 注意事项

在这里插入图片描述

  • 共存:事务与确认机制不能共存,不然会异常
  • 查验:通过RabbitMQ可视化监控界面可看到Channels栏Mod属性T表示事务,C表示监听确认
  • 有序:因为RabbitMQ生成的序列deliveryTag是由小到大自动递增的,所以最好存储消息的时候考虑到顺序性,更方便通过deliveryTag定位到消息进行操作

四:Mandatory

交换器不存储消息,所有消息都要路由到队列存储。如果中间过程消息丢失,对于生产者而言不设置的情况下是无法知晓的错误。Mandatory实现与Confirm实现类似,通过增加监控监听Listener实现。前面文章消息与队列进阶详细描述过这个参数监听

4.1 相关操作

当消息到达交换器,但是没有匹配队列路由存储时。若通过Mandatory实现监听处理则需要如下几个处理过程:

  1. 发送消息basicPublish()时将设置mandatory参数为true
  2. 为信道channel增加MandatoryListener监听
4.2 代码示例
  • 首先可以看到因为bindingKey与routingKey不一致,消息不能路由到队列
  • 然后可以看到发送消息时将mandatory参数设置为true表示增加mandatory监听
  • 最后可以看到在信道上增加了ReturnLisrtener监听,取得未路由消息相关参数信息
    在这里插入图片描述
4.3 参数详解

ReturnListener中仅仅包含唯一方法handleReturn(),该方法中含有系列参数,参数含义如下表所示:

参数 描述
replyCode 表示本次返回消息原因编码,如312消息未路由
replyText 表示本次返回消息原因描述
exchange 监听器接收到返回消息的交换器
routingKey 本次消息发送的路由键
properties 本次监听到返回消息的属性设置
body 监听到返回消息的消息体

五:备用交换器

Mandatory增加的ReturnListener监听需要在发送消息代码中增加逻辑,这对于追求功能专一性而言不是好消息。通过RabbitMQ也根本检测不到这段逻辑,也不利于后续代码维护。所以提出备用交换器,创建交换器时绑定,当交换器消息未找到路由队列时消息将转发到备用交换器

5.1 相关操作

备用交换器其原理类似于交换器与交换器绑定,需要注意以下几点:

  1. 创建交换器时使用Map参数绑定备用交换器
  2. 备用交换器接收路由到的消息不会更改任何属性,包括routingKey
  3. 可以将备用交换器设置为内置交换器
5.2 代码示例

通过参数Map绑定备用交换器,验证效果将消息发送路由键routingKey设置为备用交换器路由键。可以查看备用交换器创建时的第五个参数,上面也提到最优设置为内置交换器,属性internal

在这里插入图片描述

5.3 测试结果

最后显示备用交换器中有一条消息,证明结果的正确性。备用交换器可以用作消息不能正确路由时的一种解决方案

在这里插入图片描述

六:队列与消息持久化

这个问题在前面相关队列与队列消息的文章中已经详细讲解,为了整个消息投递可靠性的完整,这里再次描述一下队列与队列消息的持久化。注意以点:

单独的队列消息持久化并不能实现消息持久化,同理单独的队列持久化也不能实现消息持久化。需要队列与队列消息同时持久化方可

6.1 队列持久化

持久化即将队列信息写入磁盘持久化保存,当RabbitMQ应用服务故障宕机重启时可以自动进行数据恢复的操作称之为队列持久化。实现只需要在创建队列时将持久化参数设置为true即可,如下所示:

在这里插入图片描述
进入RabbitMQ应用的WEB页面控制台查看该队列标志D,表示持久化。如下所示:
在这里插入图片描述

6.2 队列消息持久化

将RabbitMQ服务应用重启,发现队列恢复,但是队列中消息数据并未恢复。因为队列消息持久化需要在发送消息时进行设置,不然也不会写入磁盘保存。代码如下所示:

在这里插入图片描述

  • 发送消息方法参数列表要求传递BasicProperties,该类使用建造者模式设计,其中deliveryMod表示消息持久化。1 默认值不进行持久化,2 将消息持久化写入磁盘
  • MessageProperties类封装常用系列BasicProperties对象,可以直接使用
6.3 总结

队列持久化 + 队列消息持久化 = 完整持久化,持久化对RabbitMQ应用的性能是一种负担,可以根据数据类型进行范围数据持久化。如订单数据、支付数据等等较为重要的数据可以采用持久化的操作尽量避免消息丢失

七:消费者确认

生产者消息已经投递并路由到队列存储,当消费者消费时消费应用宕机导致消费逻辑不完整的宕机也是保证消息百分百投递消费的关键一环。RabbitMQ针对这一点提供消费者确认机制,配置该特性后,当且仅当消费者确认以后RabbitMQ应用才会删除消息

讲解RabbitMQ消费者确认机制前需要确认默认情况下消费者将自动确认,也就是当消息从RabbitMQ应用服务取出时将被删除,这也是诱发消息丢失的原因。所以为了实现后续手动控制消息确认的逻辑,消费消息时就需要将参数autoAck设置为false

在这里插入图片描述

7.1 basicAck

消息确认,参数包含deliveryTag、multiple。作用与生产者确认Confirm一致:

  • deliveryTag:RabbitMQ应用会为每条消息产生唯一编号,生产者亦或是消费者都需要根据编码进行相关消息操作
  • multiple:批量操作,即将编码小于本次操作编码的消息都进行本次一致的操作
7.2 消息拒绝

消息拒绝有两个API,basicReject()与basicNack(),两者唯一的差距在于前者不能进行multiple的批量操作。两者共同含有以下两个参数属性:

  • deliveryTag:RabbitMQ应用会为每条消息产生唯一编号,生产者亦或是消费者都需要根据编码进行相关消息操作
  • requeue:是否重新放回队列,这里抛弃的消息如果设置了死信转发,将会被路由到配置的死信交换器