RocketMQ5源码(四)任意时间延迟消息

583 阅读13分钟

前言

本章基于rocketmq5.1.1版本,分析任意时间延迟消息(下文简称Timer消息)特性的实现方式。

涉及历史文章:

  1. RocketMQ4源码(四)生产者特性
  2. RocketMQ4源码(六)HA
  3. RocketMQ5源码(二)controller模式
  4. RocketMQ5源码(三)SlaveActingMaster模式

一、案例

默认情况下,使用Timer消息不需要在任何一端修改配置,只需要在producer端使用Message相关api即可。

Message message = new Message(TOPIC, ("Hello").getBytes(StandardCharsets.UTF_8));
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L); // 设置目标投递时间=当前时间+10s
SendResult result = producer.send(message);

Message可以通过三种方式设置延迟时间,一类是设置延迟时长,一类是直接设置投递时间戳

这两种有细微的差别,后面可以看到。

二、回顾传统延迟消息

1、基于延迟级别

传统延迟消息,仅支持通过设置延迟级别的方式发送。

DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
    "MyTestTopicA", // topic
    "TagA", // tag
    ("Hello Delay " + i).getBytes(StandardCharsets.UTF_8) // body
);
msg.setDelayTimeLevel(3); // 延迟级别=3 10s
SendResult sendResult = producer.send(msg); // 发送消息

默认延迟级别有18个,在broker侧可配置,即MessageStoreConfig#messageDelayLevel

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

2、发送消息

broker侧会将延迟消息转换后投递到topic=SCHEDULE_TOPIC_XXXX

投递队列与延迟级别mapping(queueId=delayLevel-1)。

3、构建ConsumeQueue

broker构建ConsumeQueue时,将原来应该存储tag的hashCode的位置,改为存储目标投递时间戳

4、投递到真实队列

broker侧,ScheduleMessageService服务会启动后台线程(4.9开始每个延迟级别一个线程,4.9之前是JDK的Timer,一个线程处理所有延迟级别),消费SCHEDULE_TOPIC_XXXX中每个队列的消息。

如果ConsumeQueue中记录的投递时间到期,则投递到用户原始消息topic和queue;

如果ConsumeQueue中记录的投递时间未到期,则延迟一阵子再消费;

5、思考

传统延迟消息,延迟级别有限可穷举,可以mapping到队列数量。

这就能保证每个队列中的消息是按照目标投递时间有序(目标投递时间=生产消息时间+固定延迟)。

而如果要实现Timer消息,这就行不通了。

因为commitlog和consumequeue都是顺序写的,没办法做到早投递的消息晚触达,晚投递的消息早触达。

所以归根结底,Timer消息需要能按照时间有序的方式,消费系统topic,投递到用户topic。

三、Timer消息

1、转换wheel_timer消息

HookUtils#handleScheduleMessage:broker收到producer消息后,这里会处理延迟消息逻辑。

HookUtils#transformTimerMessage:

Step1,将producer端设置延迟时间的不同方式,统一转换为目标投递时间戳

这就是producer在使用Message api上不同方式的微妙的区别:

  1. 如果设置延迟时间,目标投递时间用broker机器时间计算;
  2. 如果设置目标投递时间,使用producer机器时间;

Step2,最大延迟时间拦截。

Timer消息也并非完全任意时间延迟,有最大延迟时间,默认配置timerMaxDelaySec=3天。

Step3,延迟时间精度处理。

默认时间精度timerPrecisionMs=1000ms,就是目标投递时间会取整到1s。

Step4,流控。

对于时间轮Slot级别流控,默认阈值Integer.MAX_VALUE,这里不深入分析。

Step5,转换wheel_timer消息。

将用户topic和queue放在properties中;

将目标topic替换为rmq_sys_wheel_timer,将目标queue替换为0

将计算后的目标投递时间戳放入properties中。

至此Timer消息转换结束,投递到系统topic中,Timer消息对应的ConsumeQueue并不会做特殊处理

2、构造时间轮(enqueue)

rmq_sys_wheel_timer消息会由两个线程处理:TimerEnqueueGetServiceTimerEnqueuePutService

生成TimerRequest

TimerMessageStore#enqueue:

rmq_sys_wheel_timer只有一个队列,TimerEnqueueGetService单线程消费。

将每条消息转换为TimerRequest放入内存队列enqueuePutQueue

消费TimerRequest

TimerEnqueuePutService#run:

TimerEnqueuePutService线程消费TimerRequest,更新rmq_sys_wheel_timer的内存消费进度commitQueueOffset

TimerEnqueuePutService#run:

  1. delayTime<currWriteTimeMs,如果消息已经到期,入队dequeuePutQueue;(直接进入最后一步TimerDequeuePutMessageService线程处理)
  2. 如果消息未到期,进入时间轮

Timer消息的时间轮分为两部分数据:TimerWheelTimerLog

TimerWheel按照默认时间精度包含2天(timerRollWindowSlot)/1s个Slot

TimerMessageStore#doEnqueue:

Step1,处理滚动逻辑

对于延迟小于2天,直接能根据延迟时间找到对应槽位;

对于延迟超出2天的时间需要执行滚动,22.66天会放在1天的槽位上,2.663天(前面限制最多3天)会放在2天的槽位上。

TimerMessageStore#doEnqueue:

Step2,根据延迟时间定位时间轮Slot,顺序写入TimerLog(timerlog目录下多个定长100M文件)

TimerLog包含几个关键字段:

  1. slot.lastPos:同一个Slot中的消息在TimerLog中形成单向链表,用于后续遍历;
  2. offsetPy/sizePy:原始消息的物理offset和大小,用于后续定位消息;
  3. magic:是否滚动、是否删除;
  4. tmpWriteTimeMs:写时间;
  5. delayedTime-tmpWriteTimeMs:相较于写时间的延迟秒数;

TimerMessageStore#doEnqueue:

Step3,随机写,更新TimerWheel中Slot信息。(timerwheel文件,默认2天/1s个slot大小

TimerWheel#putSlot:Slot不存储实际数据,是指向TimerLog的指针

  1. timeMs/precisionMs:延迟时间;
  2. firstPos:Slot中第一个消息对应timerLog的物理offset;
  3. lastPos:Slot中最后一个消息对应timerLog的物理offset;

TimerWheel#getSlotIndex:根据目标延迟时间找Slot按照slot数量取模。

从timerwheel文件大小的角度来看,实际底层有2倍slotsTotal个Slot,写死14天,但这并不妨碍由timerRollWindowSlot控制按照2天滚动。

3、处理时间轮(dequeue)

当消息进入时间轮后就可以按照时间顺序处理了。

重建TimerRequest

TimerMessageStore#dequeue:

Step1,TimerDequeueGetService单线程根据时间找到时间轮对应Slot

这部分消息可能是未到期需要滚动的(2-3天),也可能是已经到期的(2天内),这会在最后一步处理。

TimerMessageStore#dequeue:

Step2,根据Slot的lastPos从后向前依次处理TimerLog记录。

TimerMessageStore#dequeue:

Step3,将TimerLog记录重新组装为TimerRequest。

TimerMessageStore#dequeue:

Step4,将TimerRequest分批放入内存队列dequeueGetQueue,CDL等待处理完成。

填充TimerRequest

从这一步开始可以多线程处理(默认3个线程),因为所有TimerRequest都需要处理(滚动或投递)。

TimerDequeueGetMessageService#run:

TimerDequeueGetMessageService线程的作用就是,从commitlog读消息,注入TimerRequest,提交TimerRequest到下一个内存队列。

滚动or投递

多线程处理,默认3个线程。

TimerDequeuePutMessageService#run:将消息转换,再次投递消息。

TimerMessageStore#convertMessage:滚动还是投递(在构造时间轮时确定),区别就在消息转换这一步。

消息到期,直接投递到用户topic和queue即可。

滚动实际的操作是再次投递消息到rmq_sys_wheel_timer,即重新发送一条Timer消息,重新走上述流程。

所以如果延迟时间超过滚动阈值(2天),会增加消息写commitlog次数

四、HA

1、checkpoint

Timer消息的流转大致是延迟topic-时间轮-用户topic,涉及三份逻辑上的物理文件,所以有三个数据需要记录。

这三个数据都记录在内存TimerCheckpoint中。

  1. lastTimerQueueOffsetrmq_sys_wheel_timer消费进度。传统延迟消息记录在delayOffset.json中,而Timer消息只需要记录queueId=0的一个消费进度即可;
  2. lastTimerLogFlushPos时间轮刷盘进度。rmq_sys_wheel_timer消费的目的是写入时间轮timerLog和timerWheel。timerLog是顺序写,可以记录刷盘进度,timerWheel是随机写,可以通过timerLog恢复;
  3. lastReadTimeMs时间轮处理进度。即到期时间t之前的消息已经投递到目标队列(或滚动);
public class TimerCheckpoint {
    // 时间轮处理进度 --- 时间戳
    private volatile long lastReadTimeMs = 0;
    // TimerLog刷盘进度
    private volatile long lastTimerLogFlushPos = 0;
    // 延迟topic(wheel_timer)消费进度 --- 当前实例
    private volatile long lastTimerQueueOffset = 0;
    // 延迟topic(wheel_timer)消费进度 --- master
    private volatile long masterTimerQueueOffset = 0;
}

TimerMessageStore.TimerFlushService#run:TimerFlushService线程每隔1s处理checkpoint和时间轮刷盘。

TimerMessageStore#prepareTimerCheckPoint:

根据运行时的情况,收集各个线程的处理进度,包括时间轮刷盘,时间轮处理,timer消息消费等。

最终checkpoint在磁盘上存储在config目录的timercheck文件中。

2、重启恢复

TimerMessageStore#recover:broker重启初始化阶段,根据checkpoint文件和物理文件恢复内存数据。

恢复时间轮

TimerLog会从checkpoint记录的刷盘进度往前推100M(一个TimerLog定长文件的大小)开始处理。

TimerMessageStore#recoverAndRevise:

TimerLog数据完整性校验(size和magic),修正TimerWheel中对应延迟时间Slot的lastPos指向,返回TimerLog实际刷盘进度。

private long recoverAndRevise(long beginOffset, boolean checkTimerLog) {
    // S1,从入参offset开始(checkpoint-100M),找offset所在TimerLog文件,从那个文件开始处理
    List<MappedFile> mappedFiles = timerLog.getMappedFileQueue().getMappedFiles();
    int index = mappedFiles.size() - 1;
    for (; index >= 0; index--) {
        MappedFile mappedFile = mappedFiles.get(index);
        if (beginOffset >= mappedFile.getFileFromOffset()) {
            break;
        }
    }
    if (index < 0) {
        index = 0;
    }
    // S2,循环TimerLog文件
    long checkOffset = mappedFiles.get(index).getFileFromOffset();
    for (; index < mappedFiles.size(); index++) {
        MappedFile mappedFile = mappedFiles.get(index);
        SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0, checkTimerLog ? mappedFiles.get(index).getFileSize() : mappedFile.getReadPosition());
        ByteBuffer bf = sbr.getByteBuffer();
        int position = 0; // TimerLog进度
        boolean stopCheck = false;
        // 每条TimerLog记录52B
        for (; position < sbr.getSize(); position += TimerLog.UNIT_SIZE) {
            try {
                bf.position(position);
                int size = bf.getInt();//size
                bf.getLong();//prev pos
                int magic = bf.getInt();
                if (magic == TimerLog.BLANK_MAGIC_CODE) {
                    break;
                }
                // S2-1,magic为0处理结束
                if (checkTimerLog && (!isMagicOK(magic) || TimerLog.UNIT_SIZE != size)) {
                    stopCheck = true;
                    break;
                }
                // 恢复delayTime = TimerLog写入时间戳+延迟毫秒数
                long delayTime = bf.getLong() + bf.getInt();
                if (TimerLog.UNIT_SIZE == size && isMagicOK(magic)) {
                    // S2-2,修正delayTime对应Slot的lastPos指向
                    timerWheel.reviseSlot(delayTime, TimerWheel.IGNORE, sbr.getStartOffset() + position, true);
                }
            } catch (Exception e) {
                LOGGER.error("Recover timerLog error", e);
                stopCheck = true;
                break;
            }
        }
        sbr.release();
        // TimerLog实际进度
        checkOffset = mappedFiles.get(index).getFileFromOffset() + position;
        if (stopCheck) {
            break;
        }
    }
    // S3,根据TimerLog数据完整情况,截断物理文件
    if (checkTimerLog) {
        timerLog.getMappedFileQueue().truncateDirtyFiles(checkOffset);
    }
    return checkOffset; // TimerLog刷盘进度
}

修正wheel_timer消费进度

修正rmq_sys_wheel_timer的消费进度,就是修正从哪里开始构建时间轮。

TimerMessageStore#recover:

rmq_sys_wheel_timer的消费进度并非完全取决于checkpoint,优先会按照TimerLog的刷盘进度来恢复。

可以理解,TimerLog写成功,才代表rmq_sys_wheel_timer消费成功。

TimerMessageStore#reviseQueueOffset:

从TimerLog最后一条记录开始向前匹配ConsumeQueue。

直到找到TimerLog记录的CommitLog消息位置和ConsumeQueue记录的CommitLog消息位置一致。

返回对应消息在ConsumeQueue中的逻辑offset。

public long reviseQueueOffset(long processOffset) {
    // 1. 从timerLog读取最后一条记录
    SelectMappedBufferResult selectRes = timerLog.getTimerMessage(processOffset - (TimerLog.UNIT_SIZE - TimerLog.UNIT_PRE_SIZE_FOR_MSG));
    if (null == selectRes) {
        return -1;
    }
    long offsetPy = selectRes.getByteBuffer().getLong();
    int sizePy = selectRes.getByteBuffer().getInt();
    // 2. 从commitlog读取对应消息
    MessageExt messageExt = getMessageByCommitOffset(offsetPy, sizePy);
    if (null == messageExt) {
        return -1;
    }
    long msgQueueOffset = messageExt.getQueueOffset();
    int queueId = messageExt.getQueueId();
    // 3. 找到ConsumeQueue
    ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
    if (null == cq) {
        return msgQueueOffset;
    }
    long cqOffset = msgQueueOffset;
    long tmpOffset = msgQueueOffset;
    int maxCount = 20000;
    // 4. TimerLog匹配ConsumeQueue
    while (maxCount-- > 0) {
        if (tmpOffset < 0) {
            break;
        }
        // 根据commitlog中的消息的逻辑offset,找consumequeue记录
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(tmpOffset);
        if (null == bufferCQ) {
            tmpOffset -= 1; // 向前继续找
            continue;
        }
        long offsetPyTemp = bufferCQ.getByteBuffer().getLong();
        int sizePyTemp = bufferCQ.getByteBuffer().getInt();
        // TimerLog记录的消息物理位置 匹配 ConsumeQueue的消息物理位置
        if (offsetPyTemp == offsetPy && sizePyTemp == sizePy) {
            cqOffset = tmpOffset;
            break;
        }
        tmpOffset -= 1; // 向前继续找
        // ...
    }
    return cqOffset;
}

恢复时间轮处理进度

TimerMessageStore#recover:时间轮的处理进度readTimeMs有特殊处理。

如果checkpoint中的lastReadTimeMs距离现在超过7天(slotsTotal),则从7天前的时间戳开始处理时间轮。

即:

  1. 历史7天之前的数据,不会再投递给客户端,这部分数据丢失;
  2. 历史7天内的数据,仍然会投递给客户端,即使已经不符合目标投递时间要求;

历史7天之前不清理,是否会导致14天(getSlotIndex)后读到历史数据?

TimerWheel#getSlot:在处理时间轮时,getSlot会做兼容处理,如果currReadTimeMs不等于Slot的timeMs会返回-1。

具体例子,假设将当前时间放入时间轮,过了14天后来取,处于同一Slot,但是取出来的数据都是-1,不会投递。

@Test
public void testExpireData() throws IOException {
    int slotsTotal = 7 * 24 * 3600; // 7天
    int precisionMs = 1000; // 1s精度
    String dir = StoreTestUtils.createBaseDir();
    TimerWheel tw = new TimerWheel(dir, slotsTotal, precisionMs);
    // 将当前时间放入时间轮
    long now = System.currentTimeMillis() / precisionMs * precisionMs;
    tw.putSlot(now, 1, 2);
    Slot slot = tw.getSlot(now);
    assertEquals(now, slot.timeMs);
    assertEquals(1, slot.firstPos);
    assertEquals(2, slot.lastPos);
    // 14天后来取
    long now_plus14 = now + TimeUnit.DAYS.toMillis(14) / precisionMs * precisionMs;
    // 处于同一Slot
    assertEquals(tw.getSlotIndex(now), tw.getSlotIndex(now_plus14));
    // 查询14天后数据
    Slot slot14 = tw.getSlot(now_plus14);
    assertEquals(-1, slot14.timeMs);
    assertEquals(-1, slot14.lastPos); // 失效
    assertEquals(-1, slot14.firstPos);
    tw.shutdown();
    StoreTestUtils.deleteFile(dir);
}

3、复制

复制的是什么

Timer消息新增了三个文件:timercheck(checkpoint)、timerlog、timerwheel,实际复制的是什么?

BrokerController#initializeBrokerScheduledTasks:只复制checkpoint(timercheck文件)

checkpoint的复制方式和config目录下其他文件一致(消费进度、topic配置等),slave主动从master拉取,只不过频率更高,3s一次。

SlaveSynchronize#syncTimerCheckPoint:slave从master同步checkpoint

  1. 时间轮的处理进度lastReadTimeMs
  2. wheel_timer消息的消费进度masterTimerQueueOffset

slave构造时间轮(enqueue)

只同步checkpoint,需要slave消费wheel_timer消息,构造时间轮。

TimerEnqueueGetService线程需要判断是否执行enqueue。

TimerMessageStore#isRunningEnqueue:

  1. 普通master-slave情况下,如果slave自己的wheel_timer消费进度未赶上master的进度,返回true需要执行enqueue,其实和slave同步commitlog后构造consumequeue逻辑是一样的;
  2. slaveActingMaster模式下,master下线,最小brokerId成为代理master,shouldRunningDequeue=true,需要执行enqueue;
  3. 5.x的controller模式/4.x的DLedgerCommitLog模式,运行时broker角色会发生变化,所以要先checkBrokerRole后判断是否需要走enqueue;

4、slave处理时间轮(dequeue)

一般情况下slave不会处理时间轮,即将到期消息投递到目标队列。

这里主要是看一下slaveActingMaster模式下,代理master的处理。

TimerMessageStore#isRunningDequeue:

成为代理master后,shouldRunningDequeue=true,会执行时间轮处理。

TimerMessageStore#doPut:

在最后滚动和投递阶段,同样会走消息逃逸逻辑escapeBridgeHook(本地or远程)。

注:在5.1.4版本及以前,重试逻辑里丢失了消息逃逸逻辑,也没太大影响,我顺便提了PR修复了。

总结

Timer消息大流程分为三步:

  1. 用户消息转换为wheel_timer消息(topic=rmq_sys_wheel_timer,queueId=0);
  2. 构造时间轮(enqueue),包含timerlog和timerwheel两个新文件;
  3. 处理时间轮(dequeue),按照时间顺序,将到期消息(或滚动)投递到目标队列;

实现细节上,Timer消息在broker侧用5组线程处理:

时间轮由2个部分组成:

  1. timerlog,顺序写,同样延迟时间的记录形成链表结构,也记录了消息在commitlog中的物理位置;
  2. timerwheel,随机写,存储n个时间槽Slot,每个槽指向timerlog中的一条记录(主要是last指针)。每次新timerlog生成,都会定位Slot后修改指针指向;

在dequeue阶段,通过到期时间戳定位timerwheel中的一个槽,再从timerlog中读取commitlog位置定位消息。

需要注意时间轮上的几个参数限制:

  1. 时间轮总槽位:固定写死14天;
  2. 时间轮精度:timerPrecisionMs,默认1000ms,槽位=(目标投递时间戳/timerPrecisionMs)%槽位数;
  3. 最大延迟时间:timerMaxDelaySec,默认3天,延迟超过3天发送timer消息失败;
  4. 时间轮滚动阈值:timerRollWindowSlot,默认2天,即2-3天延迟时间的消息都需要滚动,多次写rmq_sys_wheel_timer消息;

broker用config/timercheck文件存储checkpoint信息,用于重启后恢复和HA复制,包括:

  1. lastTimerQueueOffsetrmq_sys_wheel_timer消费进度
  2. lastTimerLogFlushPostimerlog时间轮刷盘进度
  3. lastReadTimeMstimerwheel时间轮处理进度

HA复制仅复制timercheck文件,slave每隔3s从master拉取checkpoint更新到本地。

时间轮(timerlog和timerwheel)需要slave按照rmq_sys_wheel_timer消费进度(checkpoint)自己构建。

slaveActingMaster模式下,代理master支持处理时间轮,将到期(或需要滚动)的逃逸到本地(container)或远程。

参考文档

  1. RIP-43

欢迎大家评论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎关注公众号【程序猿阿越】。