【源码解析10】消费过程源码解析

47 阅读2分钟

前置知识

消息的索引ConsumeQueue

要了解消费的过程,我们必须先了解下消息的RocketMQ消息的索引ConsumeQueue
一条ConsumeQueue由20个字节组成,分为以下3个部分:

  1. 物理偏移量(占8个字节,即在CommitLog文件当中的实际偏移量)
  2. 消息体长度(占4个字节,代表索引指向的这条Message的长度)
  3. Tag哈希值(占8个字节,Consumer可通过tag来进行消息过滤)
    在broker的存储时,一条ConsumeQueue和一条Message一一对应

ConsumQueue存储细节

为了防止单个ConsumQueue文件过大导致索引消息降低,ConsumeQueue单个文件只能存储30万条记录
该数值被写死在org.apache.rocketmq.store.config.MessageStoreConfig配置类中 image.png 文件的命名和CommitLog是类似的,会按照起始偏移量命名

消息的索引IndexFile

该索引主要用来通过Key或者时间区间来查询message的索引,和ConsumeQueue一样,也会有多个这样的索引文件,不过该索引大小为400M,单个文件可存储2000万条记录

源码位置

我们进入方法org.apache.rocketmq.example.simple.PullConsumer#main中,去看消费过程

image.png 我们可以看到start方法和Producer的start方法如出一辙,之所以这样设计是因为Consume的过程刚好是produce的逆向操作,所以其中很多方法是可以共用的 image.png

数据拉取

数据拉取的主要是通过PullMessageService进行拉取,启动的方法如下 org.apache.rocketmq.client.impl.consumer.PullMessageService#run image.png 我们继续顺着代码往里面看,可以看到最终调用的是方法org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 在该方法中通过调用Broker的api,实现消息的拉取
image.png 异步调用的broker,并返回Response后,会触发Consumer会实现的PullCallback的函数
PullCallback有成功和异常的处理逻辑 image.png 成功后会把消息存储在本地队列ProcessQueue中(org.apache.rocketmq.client.impl.consumer.ProcessQueue)
它是MessageQueue的快照,会把消息存储到msgTreeMap这个map中(TreeMap使用红黑树实现的有序Map)

image.png 最后Consumer通过线程池,并发的拉取ProcessQueue中的数据,进行业务处理 image.png