RocketMQ消费进度管理浅析

3,593 阅读8分钟

幂等性的取与舍

      分布式平台上幂等性相关语义的保证,是我们构造安全、可信赖系统的永恒追求。作为异步、解耦通常实现方案下的最优选,我时常思考Rocket MQ设计者经历怎样的断舍离?

      众所周知消息队列关于消息消费这一概念的落地实现,大体上分为三种情形:

  • At most once
  • At least once
  • Exactly once

      翻译一下就是:

  • 至多消费一次
  • 至少消费一次
  • 保证消费一次

      很显然如果至多消费一次,势必造成消息丢失;至少消费一次就对我们的业务系统提出更高的要求,保证消费一次看似美好时则需要MQ系统背负沉重代价。Rocket MQ丝毫不犹豫的选择At least once。将幂等的保证大胆的交给开发者,不仅仅体现作者对MQ性能与功能两者矛盾的无奈,同时也体现了对广大开发人员的信任。

消费现状概述

      上述论调虽然客观真实但不免有些悲观主义的意味,按照上文的理解我们业务体统需要倚仗ta,但我们又要时刻防备ta,因为一个不小心可能就会出纰漏,这还真是一个让人又爱又怕的存在。

      读到这里,笔者似乎把ta描绘成了一个顽皮的孩子,但其实有些言重了,因为以我阅读源码的理解,业务系统没有异常,MQ所在的物理运行环境又比较健康的情况下,其实比较难以出现多次重复消费。

      RocketMQ的幂等往往是由业务系统的异常逻辑,或者网络,或者不确定的运行环境破坏的。绝大多数情形下确定无疑ta依然是一个Good Boy。

      按照我们对消息系统的朴素理解,消息的消费过程满足以下几个规律:

  • 虽然不会严格的按照投递顺序进行消费,但大体上保持先进先出这个趋势
  • 消息应该被精确的记录当前消费状态
  • 总有一个角色负责统计、持久化消费偏移量

      带着经验主义我们看看作者都为平稳消费与进度管理做出了哪些努力。

⚠️注:Rocket MQ的顺序消费模型是可以严格保证顺序的。

OffsetStore

      消息被消费后也就失去了在ProcessQueue中停留的资格,ProcessQueue会删除该消息,并返回当前的最小偏移量放置到消息进度表中。很容易想象,如果这个消费进度不加以持久化,那么每次启动都要重头消费,显然无法接受,可是如何持久化,又持久化到何处呢?

      Rocket MQ支持两种订阅模式:

  • 集群消费模式:默认的消费模式,所有消息只需要被同组任一消费者消费一次即可,大家共享订阅Topic下的消费偏移量。

  • 广播消费模式:各个消费者的消费行为是完全独立的,订阅Topic下所有的消息都需要被该组下所有消费者消费。

      针对两种消费模型的特性,容易发现二者并不好一概而论,理想的实现是划分为两个策略,一个集中到Broker管理,一个分散出去由消费者管理。OffsetStore接口负责相关事宜,源码应证了我们猜想。 image.png       先来看看OffsetStore接口定义:

public interface OffsetStore {

    /**
     * 从消息进度存储文件加载消息进度到内存
     */
    void load() throws MQClientException;
    
    /**
     * Get offset from local storage
     * @return The fetched offset
     */
    long readOffset(MessageQueue mq, ReadOffsetType type);

    /**
     * Remove offset
     */
    void removeOffset(MessageQueue mq);

    Map<MessageQueue, Long> cloneOffsetTable(String topic);
    
    /**
     * 更新内存中的消息进度
     * Update the offset,store it in memory
     */
    void updateOffset(MessageQueue mq, long offset, boolean increaseOnly);
    
    /**
     * 保留所有偏移量,可能在本地存储或远程服务器
     * Persist all offsets,may be in local storage or remote name server
     */
    void persistAll(Set<MessageQueue> mqs);

    /**
     * 保留指定消息队列偏移量,可能在本地存储或远程服务器
     * Persist the offset,may be in local storage or remote name server
     */
    void persist(MessageQueue mq);

    /**
     * 更新存储在 Broker 端的消息消费进度,使用集群模式
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
        throws RemotingException, MQBrokerException,
               InterruptedException, MQClientException;

}

      较之源码,方法排列被我调换了顺序,需要着重关注的我放到了后面。

⚠️注:如果没有Rocket MQ源码阅读经历ProcessQueue显得有些突兀,你可以将ta理解为消息在Consumer端的载体、物理队列某一个截取片段。作者如此定义ta:Queue consumption snapshot

LocalFileOffsetStore

      广播模式下消息进度保留在Consumer端,文件遵守约定放置在可配置的固定目录下,文件路径如下:

public class LocalFileOffsetStore implements OffsetStore {

    /**
     * 存储文件夹路径可定制
     */
    public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
        "rocketmq.client.localOffsetStoreDir",
        System.getProperty("user.home") + File.separator + ".rocketmq_offsets"
    );
    
    /**
     * 构造方法拼接出了文件的完整路径
     */
    public LocalFileOffsetStore(MQClientInstance mQClientFactory, 
        String groupName) {
        this.mQClientFactory = mQClientFactory;
        this.groupName = groupName;
        this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
            this.mQClientFactory.getClientId() + File.separator +
            this.groupName + File.separator +
            "offsets.json";
    }

}

      默认在用户路径下一层创建一个".rocketmq_offsets"文件夹,注意这里有一个细节,文件夹以"."开头,在Linux系统中属于隐藏文件,需要加-a参数才能被显示。为了便于理解,下图展示了一个文件夹路径和一个Offset持久化文件的路径。

image.png

      广播模式下Consumer#start()之后会调用OffsetStore.load()来加载消费进度,其原理就是根据约定拼接处文件全路径之后读取相应文件,然后序列化为OffsetSerializeWrapper对象:

public class OffsetSerializeWrapper extends RemotingSerializable {

    /* 详细记录每个队列当前消费进度 */
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable
        = new ConcurrentHashMap<>();
    
}

      假设我们有个发送短信的服务订阅"SMS_prod"Topic,那么形成的Json如下所示:注意offsetTable属性也是一个Json,而且key是MessageQueue对象,valule是一个数字表示偏移量。

{
    "offsetTable": {
        {
            "topic": "SMS_prod",
            "brokerName": "broker0"
            "queueId": 0
        }: 100,
        
        {
            "topic": "SMS_prod",
            "brokerName": "broker0"
            "queueId": 1
        }: 100,
    }
}

      既然可以在指定文件load关键信息,自然就有相关机制负责写入。还记得上文提到的persistAll方法吗?

public void persistAll(Set<MessageQueue> mqs) {
    /* 构造OffsetSerializeWrapper对象 */
    OffsetSerializeWrapper offsetSerializeWrapper = 
        new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }
    /* 将offsetSerializeWrapper对象序列化 */
    String jsonString = offsetSerializeWrapper.toJson(true);
    /* 将序列化好的offsetSerializeWrapper写入文件 */
    MixAll.string2File(jsonString, this.storePath);
}

      对offsets.json的相关操作都被封装在MixAll工具类中:

  • MixAll.file2String: 将文件读取出来
  • MixAll.string2File: 将序列化好的对象写入文件

RemoteBrokerOffsetStore

      因为偏移量维护在Broker端,所以该实现的load方法仅仅是一个声明。构造方法不需要计算文件路径也尤为简单,二者的offsetTable属性是一致的。我们着重来看看集群消费模式下如何保存消息消费进度。

public void persistAll(Set<MessageQueue> mqs) {
    HashSet<MessageQueue> unusedMQ = new HashSet<>();

    for (Map.Entry<MessageQueue, AtomicLong> entry : offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        AtomicLong offset = entry.getValue();
        if (offset != null) {
            if (mqs.contains(mq)) {
                this.updateConsumeOffsetToBroker(mq, offset.get());
            } else {
                unusedMQ.add(mq);
            }
        }
    }

    if (!unusedMQ.isEmpty()) {
        for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq);
        }
    }
}

      不用深入研究,我们应该能发现至少两处不同:

  • 粒度不同:广播模式是直接一下子把整个offsetTable持久化,而集群模式细化到了entry级别。
  • 调用方式不同:广播模式是直接JVM内部调用写入文件即可,而集群模式需要RPC调用参与。       这里有必要强调一下二者产生的offset.json文件也是有区别的,下文我会分析,同时也带大家了解该RPC过程。
RPC调用栈:

    RemoteBrokerOffsetStore#persistAll()
->  RemoteBrokerOffsetStore#updateConsumeOffsetToBroker()
    组装好RPC请求头UpdateConsumerOffsetRequestHeader对象
->  MQClientAPIImpl#updateConsumerOffsetOneway()
    组装好RPC请求对象RemotingCommand
->  NettyRemotingClient#invokeSync()
    发起RPC调用
    
更新偏移量的RPC调用类型是RequestCode.UPDATE_CONSUMER_OFFSET
顺着这个枚举来看看Broker端的相关处理:

    ConsumerManageProcessor.updateConsumerOffset()
->  ConsumerOffsetManager.commitOffset() 

      追踪源码发现,其实每次Consumer进行RPC调用上报自己的消费进度,Broker接收之后并没有立即进行持久化,而是直接更新到内存中。

private void commitOffset(String clientHost, String key, 
    int queueId, long offset) {
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    
    ConcurrentMap<Integer, Long> map = offsetTable.get(key);
    if (Objects.isNull(map)) {
        map = new ConcurrentHashMap<>(32);
        map.put(queueId, offset);
        this.offsetTable.put(key, map);
    } else {
        Long storeOffset = map.put(queueId, offset);
    }
}

      TOPIC_GROUP_SEPARATOR为定义的常量: "@",之前我们提到过二者json有些许区别,offsetTable的key变成了一个拼接出来的字符串,该字符串左侧是TopicName,右侧是ConsumeGroupName中间用@符号连接。方便理解,我把这个json也展示出来:

/**
 * 注意一下这个key:%RETRY%ConsumeGroup
 * 笔者后期会有专门文章分析
 */
{
    "offsetTable": {
        "Topic@ConsumeGroup":{
            0: 38,
            1: 37,
            2: 50,
            3: 10
         },
         "%RETRY%ConsumeGroup": {
            0: 0
         }
      }
}

持久化

      两种文件持久化机制没有什么大的区别定时任务触发,或者消费端正常关闭执行shotdown()之前手动触发。

      广播模式定时任务定义在MQClientInstance中,MQClientInstance对象在被实例化之后调用start()时启动该定时任务。定时任务的时间间隔支持配置默认是5000ms,延时10000ms之后开始执行。

public void start() throws MQClientException {
    this.scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        },
        1000 * 10,
        this.clientConfig.getPersistConsumerOffsetInterval(),
        TimeUnit.MILLISECONDS
    );
}

      集群模式定时任务定义BrokerController中,BrokerController对象在被实例化之后会有一系列初始化动作,initialize()会启动该定时任务。定时任务的时间间隔支持配置默认是5000ms,延时10000ms之后开始执行。

public boolean initialize() throws CloneNotSupportedException {
    this.scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        },
        1000 * 10,
        this.brokerConfig.getFlushConsumerOffsetInterval(),
        TimeUnit.MILLISECONDS
    );
}

重复消费

      原理分析了那么久,我想要传达的观点就是正常使用的前提下重复消费的原因一定跟offset上报,持久化有关系。

  • 集群消费过程中Consumer意外宕机,offset没有上报导致重复消费

  • 集群消费过程中Broker意外宕机,offset没有将最新的偏移量持久化导致重复消费

  • 广播消费过程Consumer意外宕机,offset没有持久化到本地文件导致重复消费

  • offset.json文件意外损坏或删除,进度丢失导致重复消费

  • offset.json文件被篡改,进度不准确导致重复消费       还有一种是因为开发者返回了错误的ACK标示,导致Rocket误判以为消费失败,触发重试逻辑导致的重复消费。