RabbitMQ如何保证消息的可靠性?

2,749 阅读7分钟

RabbitMQ在应用级别为我们提供了确保消息可靠的解决方案,它和RabbitMQ节点确保消息可靠同样重要。

一、Consumer消息确认

当Consumer使用autoAck=true的方式订阅RabbitMQ节点消息的时候,可能由于网络原因也有可能Consuerm在处理消息的时候出现异常,亦或者服务器宕机,都有可能导致消息丢失。 当autoAck=true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。为了避免这种情况的发生,RabbitMQ为提供了消费端确认的方式处理消息,这个时候我们需要设置autoAck=false

autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});

当我们设置autoAck=false的时候,RabbitMQ节点会等待消费者处理完消息后的确认标记,当RabbitMQ收到确认标记后,才会删除消息。如果RabbitMQ一直没有收到消费者的确认,并且确认消费此消息的消费者已经断开连接,RabbitMQ则会把这个消息重新插入到消息队列的头部,并转发给还存活消费者消费。

RabbitMQ一直没有收到消费者的确认信号,可能是由于这是一个耗时很长的任务,这并不意味着消费者已经挂掉了,所以只有当RabbitMQ确认这个Channel已经真的丢失了,才会重新转发没有被确认的消息。

消费者要确认这条消息被消费了并告诉RabbitMQ,那RabbitMQ是如何标识每条消息的那?答案是deliveryTag

当消费方订阅了RabbitMQ的消息之后,RabbitMQ将会调用basic.deliver向消费方推送消息,该方法带有一个单调递增的正整数的deliveryTag可以唯一的标识这个消息在此channel上的传递。deliveryTag的作用域是channel,所以必须在同一个通道进行消息的确认,否则将会导致一个unknown delivery tag异常。当然我觉得这种情况真的极少会发生。

RabbitMQ为我们提供了三种在消费端确认或者拒绝消息的方法:

  • com.rabbitmq.client.Channel#basicAck()

    /**
     * @param deliveryTag 交付标记
     * @param multiple 是否批量确认,true:将一次性ack这条 channel上所有小于deliveryTag的消息。
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    
  • com.rabbitmq.client.Channel#basicNack()

    /**
     * @param deliveryTag 交付标记
     * @param multiple 是否批量拒绝,true:将一次性reject这条 channel上所有小于deliveryTag的消息。
     * @param requeue 被拒绝的消息是否重回队列
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    
  • com.rabbitmq.client.Channel#basicReject()

    // channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
    
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    

消费端如何确认消息?

  • Consumer
public static void main(String[] args) throws Exception {
      Connection connection = ConnectionUtil.getConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
              BuiltinExchangeType.DIRECT);
      boolean durable = true;
      channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false, false, null);
      channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println("ConsumerTag is [" + consumerTag + "]," +
                  " [x] Received '" + message + "'," +
                  " DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
                  " Thread is [" + Thread.currentThread().getName() + "]"
          );
          // 模拟处理消息的耗时
          try {
              TimeUnit.SECONDS.sleep(10);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          // 返回消息确认
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
          System.out.println(" [x] Done");

      };
      boolean autoAck = false;
      channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
      });
  }
  • Pubisher
public static void main(String[] args) throws Exception {
      Connection connection = ConnectionUtil.getConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
              BuiltinExchangeType.DIRECT);
      for (int i = 1; i <= 10; i++) {
          // 推送持久化消息
          String message = "Send message " + i;
          channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", false, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
          System.out.println(" [x] sent ' " + message + " '");
      }
  }

我们先启动生产者向消费者发送10条消息,此时消费者还没有被启动(交换机对应的队列已经被声明)。

此时有10条消息存储在队列里,等待被消费者消费。然后我们启动消费者,消息队列中的消息全部被发送给消费者,此时消费者消费了3条消息,并向RabbitMQ返回了确认

然后我们将消费者STOP模拟消费者挂掉的情况,此时没有被确认的7条消息,重新在消息队列中恢复,等待被发送给存活的消费者。

接下来我们讨论另外一个问题,如果在消费者处理消息的时候出现了异常怎么办?RabbitMQ判断消息是否要重新投递的唯一依据是消费者了channel有没有断开,这个时候这个消息是否需要被重新处理?这种情况应该根据具体的使用场景来判断,并不是一定需要RabbitMQ重新投递消息。但是Rabbit为我们提供了一种拒绝消息并重回队列的方法。

public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
                BuiltinExchangeType.DIRECT);
        boolean durable = true;
        channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false, false, null);
        channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                String message = new String(delivery.getBody(), "UTF-8");

                if (2 == delivery.getEnvelope().getDeliveryTag()) {
                    throw new IllegalStateException("消息无法被正常处理, DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]");
                }

                System.out.println("ConsumerTag is [" + consumerTag + "]," +
                        " [x] Received '" + message + "'," +
                        " DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
                        " Thread is [" + Thread.currentThread().getName() + "]"
                );
                // 模拟处理消息的耗时
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 返回消息确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(" [x] Done");
            } catch (Exception e) {
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }

        };
        boolean autoAck = false;
        channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
        });
    }

但是一般不建议使用重回队列的功能,因为被拒绝的消息会插入到消息队列的头部,这样容易导致程序陷入死循环,我们可以使用死信队列来代替重回队列的功能。

二、Publiser消息确认

当生产者向RabbitMQ的交换机发送消息的时候,我们也没有办法保证消息一定能够到达交换机,为了保证消息的可靠发送,RabbitMQ提供了事务消息和消息确认两种解决方案,由于事务消息会严重损耗RabbitMQ的性能,所以基本不会使用。我们可以使用异步的消息确认方式保证发送的消息一定到达RabbitMQ。

static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
                BuiltinExchangeType.DIRECT);
        // confirm模式
        channel.confirmSelect();
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                },

                (deliveryTag, multiple) -> {
                }
        );
        channel.addReturnListener(returnMessage -> {
            String message = new String(returnMessage.getBody());
            System.out.println("No routing message " + message);
        });

        boolean mandatory = true;
        for (int i = 1; i <= 100; i++) {
            // 推送持久化消息
            long nextSeqNo = channel.getNextPublishSeqNo();
            String message = "Send message " + nextSeqNo;
            channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            confirmSet.add(nextSeqNo);
            System.out.println(" [x] sent ' " + message + " '");
        }
    }

当消息到达RabbitMQ被持久化后,会返回deliveryTag,异步监听消息确认可能是批量的也可能是单条的,我们可以维护一个有序的集合,存储没有被确认的消息,用来重新发送给RabbitMQ。

还有一个问题就是如果交换机没有匹配的队列或者队列还没有来的及被声明,消息也有可能被丢失,这时我们可以设置mandatory=true,添加returnListener来处理这种没有被路由的消息。也可通过设置交换机的alternate-exchange参数,来设置一个备份交换机,用来存储这些没有被路由的消息,以减少发送者程序的复杂度。

    static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
                BuiltinExchangeType.DIRECT, true, false, createBackUpExchange(channel));
        // confirm模式
        channel.confirmSelect();
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                },

                (deliveryTag, multiple) -> {
                }
        );
        boolean mandatory = false;
        for (int i = 1; i <= 100; i++) {
            // 推送持久化消息
            long nextSeqNo = channel.getNextPublishSeqNo();
            String message = "Send message " + nextSeqNo;
            channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            confirmSet.add(nextSeqNo);
            System.out.println(" [x] sent ' " + message + " '");
        }
    }

    public static Map<String, Object> createBackUpExchange(Channel channel) throws Exception {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", ExchangeCostant.EXCHANGE_BACKUP);

        // 声明一个广播类型的交换机
        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_BACKUP, BuiltinExchangeType.FANOUT, true, false, null);
        channel.queueDeclare(QueueCostant.BACK_UP, true, false, false, null);
        channel.queueBind(QueueCostant.BACK_UP, ExchangeCostant.EXCHANGE_BACKUP, "");

        return arguments;
    }

三、RabbitMQ节点

在应用级别保障消息的可靠性已经极大的提高了应用的安全性,但是当RabbitMQ节点重启、宕机等情况依旧会导致消息丢失,所以我们还需要设置队列的持久性、消息的持久性,保证节点宕机或重启后能恢复消息。

如果是单机故障导致机器无法恢复,消息还是会被丢失,所以对于关键的消息我们要设置镜像队列和集群保证消息服务的高可用。

参考文章:blog.csdn.net/u013256816/…