RocketMQ 生产者和消息存储

2,401 阅读11分钟

RocketMQ 生产者和消息存储

1. 前言

本篇主要从源码分析消息的发送及存储。rocketmq发送消息分为三种实现方式:可靠同步发送、可靠异步发送、单向发送。目前的MQ中间件从存储模型来看,分为需要持久化和不需要持久化两种。本篇文章会分析rocketmq的消息存储机制。

2. RocketMQ 消息

先看看rocketmq 消息封装类org.apache.rocketmq.common.message.Message

RocketMQ

基本属性:主题topic、消息flag、消息体、扩展属性

隐藏属性:

  • tag:消息TAG,用于消息过滤
  • keys:消息索引键
  • waitStoreMsgOK:消息发送时是否等消息存储完成后再返回
  • delayTimeLevel:消息延迟级别,用于定时消息或消息重试

扩展属性都存在Message的properties中。

3. 生产者启动流程

我们从DefaultMQProducerImpl 的start 方法追踪。

第一步:检查productGroup 是否符合要求,并改变生产者的instanceName为进程ID

//DefaultMQProducerImpl::start
public void start() throws MQClientException {
    this.start(true);//默认为true
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            //第一步
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

第二步:创建MQClientInstance实例。

第三步:向MQClientInstance注册,将当前生产者加入MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。

第四步:启动MQClientInstance,如果MQClientInstance已经启动,如果已经启动则本次不启动。

4. 消息发送基本流程

消息发送流程主要是:验证消息、查找路由、消息发送(包含异常处理机制)。

消息验证,主要是进行消息的长度验证,我们主要讲解一下查找路由和消息发送。

4.1 查找路由

消息发送之前,首先需要获取主题的路由信息

//DefaultMQProducerImpl::tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

如果生产者缓存了该 topic 路由信息,包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果是第一次发送消息,未找到会尝试用默认topic去查询。没找到则报错。

4.2 选择消息

根据路由信息选择消息队列,返回的消息队列按照broker、序号排序。首先消息发送采取重试机制,循环执行,选择消息队列、发送消息,发送成功则返回,发送失败则重试。消息选择有两种方式。

  • sendLatencyFaultEnable=false,默认机制
  • sendLatencyFaultEnable=true,启用Broker故障延迟机制
//MQFaultStrategy::selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

4.3 消息发送

消息发送API核心入口:DefaultMQProducerImpl::sendKernelImpl

private SendResult sendKernelImpl(final Message msg,
                                    final MessageQueue mq,
                                    final CommunicationMode communicationMode,
                                    final SendCallback sendCallback,
                                    final TopicPublishInfo topicPublishInfo,
                                    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略
}

参数详解:

  • Message msg:待发送消息
  • MessageQueue mq:消息将发送到消息队列上
  • CommunicationMode communicationMode:消息发送模式,SYNC、ASYNC、ONEWAY
  • SendCallback sendCallback:异步消息回调函数
  • TopicPublishInfo topicPublishInfo:主题路由消息
  • long timeout:消息发送超时时间

发送步骤:

  1. 根据MessageQueue获取Broker的网络地址
  2. 为消息分配全局唯一ID
  3. 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑
  4. 构建消息发送请求包
  5. 根据消息发送方式,同步、异步、单向方式进行网络传输
  6. 如果注册了消息发送钩子函数,执行after逻辑

4.3.1 同步发送

MQ客户端发送消息的入口是MQClientAPIImpl::sendMessage

同步发送步骤

  1. 检查消息发送是否合理
//AbstractSendMessageProcessor::msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending message is forbidden");
        return response;
    }
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }

    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),
            requestHeader.getDefaultTopic(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

        if (null == topicConfig) {
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicConfig =
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                        topicSysFlag);
            }
        }

        if (null == topicConfig) {
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    }

    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) {
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
            queueIdInt,
            topicConfig.toString(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    }
    return response;
}
  1. 如果消息重试次数超过允许的最大重试次数,消息将进入DLD延迟队列
  2. 调用DefaultMessageStore::putMessage进行消息存储

4.3.2 异步发送

异步发送,无须阻塞等待消息服务器返回消息发送结果,只需要提供一个回调函数供消息发送客户端在收到响应结果回调。异步方式相比同步发送,发送端的发送性能提高了不少。

4.3.3 单向发送

单向发送,无须等待结果,也无须提供回调函数,消息发送端压根不关心消息是否发送成功,原理和异步发送相同,只是消息发送端收到结果后什么也不做。

4.3.4 批量发送

批量消息发送是将同一主题的多条信息一起打包发送给消息服务端,减少网络调用次数,提高网络传输速率。

单条消息发送时,消息体的内容将保存在body中,批量消息发送,需要将多条消息的内容存储在body中,RocketMQ 对多条消息内容进行固定格式进行存储。

消息格式

批量发送:

//DefaultMQProducer::send
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

**发送流程:**首先在消息发送端,调用batch方法,将一批消息封装成MessageBatch对象,MessageBatch内部持有Listmessages,这样批量发送就和单条发送流程完全一样了。

MessageUML

循迹一下:

//DefaultMQProducer::send
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}


//DefaultMQProducer::batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}


//DefaultMQProducerImpl::send
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

5. 消息存储

业务系统大多需要MQ有持久存储的能力,能大大增加系统的高可用性。

我们先看看rocketmq 数据流向:

数据流向

  • CommitLog:消息存储文件,所有消息主题的消息都存储在CommitLog文件中
  • ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到消息消费队列,供消息消费者消费
  • IndexFile:消息索引文件,主要存储消息Key与Offset的对应关系
  • 事务状态服务:存储每条消息的事务状态
  • 定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉去进度

RocketMQ的存储架构:

RocketMQ的存储架构

消息存储实现类: org.apache.rocketmq.store.DefaultMessageStore

DefaultMessageStore

介绍核心属性:

  • MessageStoreConfig messageStoreConfig:消息存储配置属性
  • CommitLog commitLog:CommitLog 文件存储的实现类
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */,ConsumeQueue>> consumeQueueTable :消息队列存储缓存表,按消息主题分组
  • FlushConsumeQueueService flushConsumeQueueService:消息队列文件ConsumeQueue刷盘线程
  • CleanCommitLogService cleanCommitLogService:清除CommitLog问价服务
  • CleanConsumeQueueService cleanConsumeQueueService:清除ConsumeQueue文件服务
  • IndexService indexService:索引文件实现类
  • AllocateMappedFileService allocateMappedFileService:MappedFile分配服务
  • ReputMessageService reputMessageService:CommitLog消息分发,根据CommitLog文件构建ConsumeQueue、IndexFile文件
  • HAService haService:存储HA机制
  • TransientStorePool transientStorePool:消息堆内存缓存
  • MessageArrivingListener messageArrivingListener:消息拉取长轮询模式消息达到监听器
  • BrokerConfig brokerConfig:Broker配置属性
  • StoreCheckpoint storeCheckpoint:文件刷盘检测点
  • LinkedList dispatcherList:CommitLog文件转发请求

5.1 消息发送存储流程

消息存储入口:org.apache.rocketmq.store.DefaultMessageStore::putMessage

  1. 如果当前Broker停止工作或Broker为SLAVE 角色或当前Rocket不支持写入则拒绝消息写入,如果消息长度超过256字符、消息属性长度超过65536个字符将拒绝该消息写入
  2. 验证消息延迟级别
  3. 获取当前可以写入的CommitLog文件
  4. 写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行
  5. 设计消息的存储时间
  6. 将消息追加到MappedFile中
  7. 创建全局唯一消息ID
  8. 获取该消息在消息队列的偏移量
  9. 根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度
  10. 如果消息长度 +END_FILE_MIN_BLANK_LENGTH 大于CommitLog文件
  11. 将消息内存存储到ByteBuffer中,然后创建AppendMessageResult。
  12. 更新消息队列逻辑偏移量
  13. 处理完消息追加逻辑后将释放putMessageLock锁
  14. DefaultAppendMessageCallback::doAppend 只是将消息追加到内存中,需要根据同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘

简化成如下时序图

5.2 内存映射流程

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满后再创建一个新文件,文件名就为第一条消息对应的全局物力偏移量。

内存映射流程

步骤:

  1. 内存映射文件MappedFile通过AllocateMappedFileService创建
  2. MappedFile的创建是典型的生产者-消费者模型
  3. MappedFileQueue调用getLastMappedFile获取MappedFile时,将请求放入队列中
  4. AllocateMappedFileService线程持续监听队列,队列有请求时,创建出MappedFile对象
  5. 最后将MappedFile对象预热,底层调用force方法和mlock方法

5.3 刷盘流程

消息在调用MapedFile的appendMessage后,也只是将消息装载到了ByteBuffer中,也就是内存中,还没有落盘。落盘需要将内存flush到磁盘上,针对commitLog,rocketMQ提供了两种落盘方式。

刷盘流程

  • producer发送给broker的消息保存在MappedFile中,然后通过刷盘机制同步到磁盘中
  • 刷盘分为同步刷盘和异步刷盘
  • 异步刷盘后台线程按一定时间间隔执行
  • 同步刷盘也是生产者-消费者模型。broker保存消息到MappedFile后,创建GroupCommitRequest请求放入列表,并阻塞等待。后台线程从列表中获取请求并刷新磁盘,成功刷盘后通知等待线程。

同步刷盘(CommitLog.java)

//封装的一次刷盘请求
public static class GroupCommitRequest {
    //这次请求要刷到的offSet位置,比如已经刷到2,
    private final long nextOffset;
    //控制flush的拴
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile boolean flushOK = false;


    public GroupCommitRequest(long nextOffset) {
        this.nextOffset = nextOffset;
    }


    public long getNextOffset() {
        return nextOffset;
    }

    //刷完了唤醒
    public void wakeupCustomer(final boolean flushOK) {
        this.flushOK = flushOK;
        this.countDownLatch.countDown();
    }


    public boolean waitForFlush(long timeout) {
        try {
            this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            return this.flushOK;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }
}

/**
    * GroupCommit Service
    * 批量刷盘服务
    */
class GroupCommitService extends FlushCommitLogService {
    //用来接收消息的队列,提供写消息
    private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
    //用来读消息的队列,将消息从内存读到硬盘
    private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

    //添加一个刷盘的request
    public void putRequest(final GroupCommitRequest request) {
        synchronized (this) {
            //添加到写消息的list中
            this.requestsWrite.add(request);
            //唤醒其他线程
            if (!this.hasNotified) {
                this.hasNotified = true;
                this.notify();
            }
        }
    }

    //交换读写队列,避免上锁
    private void swapRequests() {
        List<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }


    private void doCommit() {
        //读队列不为空
        if (!this.requestsRead.isEmpty()) {
            //遍历
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; (i < 2) && !flushOK; i++) {
                    //
                    flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
                    //如果没刷完 即flushOK为false则继续刷
                    if (!flushOK) {
                        CommitLog.this.mapedFileQueue.commit(0);
                    }
                }
                //刷完了唤醒
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            //清空读list
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mapedFileQueue.commit(0);
        }
    }


    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            try {
                this.waitForRunning(0);
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        //正常关闭时要把没刷完的刷完
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }

        synchronized (this) {
            this.swapRequests();
        }

        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end");
    }
    }

异步刷盘(CommitLog.java)

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    //不停轮询
    while (!this.isStoped()) {
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //拿到要刷盘的页数
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

        int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

        boolean printFlushProgress = false;

        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        //控制刷盘间隔,如果当前的时间还没到刷盘的间隔时间则不刷
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = ((printTimes++ % 10) == 0);
        }

        try {
            //是否需要刷盘休眠
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }
            //commit开始刷盘
            CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
            long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }
    }

    // Normal shutdown, to ensure that all the flush before exit
    boolean result = false;
    for (int i = 0; i < RetryTimesOver && !result; i++) {
        result = CommitLog.this.mapedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName() + " service end");
}

6. 小结&参考资料

小结

消息发送流程图:

消息发送流程图

消息存储流程图:

RocketMQ/16b3c679bf8c166c%201.jpg

参考资料