RabbitMQ 之消息的可靠性投递

4,337

默认情况下,消息发送端发送消息给 RabbitMQ 后,RabbitMQ 是不会返回任何信息的。 那么我们怎么知道消息是中途丢失了还是到达了 broker 呢?

RabbitMQ 提供了两种确认消息是否投递成功的方法

  • 设置 channel 为 transaction 模式,通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案
  • 设置 channel 为 confirm 模式,这是 RabbitMQ 提供的解决方案

两种模式不能共存

channel 的 transaction 模式

RabbitMQ 中与事务机制有关的方法有三个

  • txSelect:用于将当前 channel 设置成 transaction 模式
  • txCommit:用于提交事务
  • txRollback:用于回滚事务

在通过 txSelect 开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在 txCommit 执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback 回滚事务了

事务确实能够解决 producer 与 broker 之间消息确认的问题,只有消息成功被 broker 接收,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作,同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能。

RabbitMQ 提供了一个更好的方案,使用 channel 信道的 confirm 模式。

channel 的 confirm 模式

生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式. 该模式下,所有在该信道上发布的消息都会被分派一个唯一的ID(从1开始),当消息被投递到所有匹配的队列后,broker 就会发送一个(包含消息的唯一 ID 的)确认给发送端, 如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条nack消息,发送端的 Confirm Listener 会去监听应答

broker回传给发送端的确认消息中 deliver-tag 域包含了确认消息的ID,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个ID之前的所有消息都已经得到了处理

confirm模式最大的好处在于他是异步的,生产者可以在等信道返回的同时继续发送下一条消息。

延伸

上面说到“当消息被投递到所有匹配的队列后,broker 就会发送一个(包含消息的唯一 ID 的)确认给发送端”,万一发送确认后, rabbitMq 崩溃了,消息队列中的消息就都没了,这时候发送端还以为消息还在队列中。

为了防止这种情况的发送,rabbitMq 需要对队列和消息进行持久化。

当消息和队列开启持久化之后,确认信息会等到消息写入磁盘之后再发出

Confirm 的三种使用方式

  • 普通确认:每发送一条消息后,调用channel.waitForConfirms()方法,同步等待服务器端confirm。实际上是一种串行confirm了。
  • 批量确认:每发送一批消息后,调用channel.waitForConfirms()方法,同步等待服务器端confirm
  • 异步确认:为channel添加一个监听器,rabbitmq 会回调这个方法,示例代码如下
// 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
    //消息失败处理
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //deliveryTag;唯一消息标签
        //multiple:是否批量
        System.err.println("-------no ack!-----------");
    }
    //消息成功处理
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
    }
});

实际开发中很少会直接使用 RabbitMQ 包,更多时候是使用 spring 提供的 spring-rabbit ,它封装了 RabbitMQ 并将它集成到了 spring 框架中。

这里给出 springBoot 版的异步确认的详细实现,供大家参考。

RabbitMQ confirm 模式之异步确认

批量 confirm 模式 和 异步confirm模式效率更高些,但是批量confirm模式下,如果 rabbitMq 返回失败,那么需要重新发送这一批消息,建议最好还是选择异步confirm模式

超时

RabbitMQ 的响应可能会超时,超时可能是消息没有到达 mq ,也有可能是网络延迟导致的。对于响应超时的消息,通常被认定为投递失败。

对于投递失败的消息,需要进行消息补偿

消息补偿

消息补偿由发送端自己设计,常见的设计方案为

数据落库,消息状态打标

如图

  • step1: 发送业务消息前,先将业务数据和消息状态入库,并将消息状态初始化为发送中
  • step2: 发送业务消息,设置超时时间,同时异步监听 RabbitMQ 响应
  • step3: RabbitMQ 返回响应
  • step4: 根据响应结果,更新消息状态,投递成功则将消息状态设置成成功
  • step5: 定时任务找出状态为发送中,且时间超时的
  • stem6: 重新投递
  • stem7: 经过上述步骤多次(通常是3次)循环后,依然失败的,设置消息状态为失败
  • stem8: 人工去解决状态为失败的消息

重复投递

消息补偿机制可能会导致重复投递,重复投递可能导致消费端重复消费。但重复投递又无法完全避免,因此消费端需要防止重复消费

总结

要保证 RabbitMQ 的消息可靠性投递,需要做到以下几点

  • 消息发送端开启 channel 的 confirm 模式
  • 消息发送端异步接收 RabbitMQ 响应
  • RabbitMQ 对队列和消息进行持久化
  • 消息发送端建立消息投递失败的补偿机制

相关文章

RabbitMQ 之消息的可靠性消费

RabbitMQ 持久化