前置知识
消息的索引ConsumeQueue
要了解消费的过程,我们必须先了解下消息的RocketMQ消息的索引ConsumeQueue
一条ConsumeQueue由20个字节组成,分为以下3个部分:
- 物理偏移量(占8个字节,即在CommitLog文件当中的实际偏移量)
- 消息体长度(占4个字节,代表索引指向的这条Message的长度)
- Tag哈希值(占8个字节,Consumer可通过tag来进行消息过滤)
在broker的存储时,一条ConsumeQueue和一条Message一一对应
ConsumQueue存储细节
为了防止单个ConsumQueue文件过大导致索引消息降低,ConsumeQueue单个文件只能存储30万条记录
该数值被写死在org.apache.rocketmq.store.config.MessageStoreConfig配置类中
文件的命名和CommitLog是类似的,会按照起始偏移量命名
消息的索引IndexFile
该索引主要用来通过Key或者时间区间来查询message的索引,和ConsumeQueue一样,也会有多个这样的索引文件,不过该索引大小为400M,单个文件可存储2000万条记录
源码位置
我们进入方法org.apache.rocketmq.example.simple.PullConsumer#main中,去看消费过程
我们可以看到start方法和Producer的start方法如出一辙,之所以这样设计是因为Consume的过程刚好是produce的逆向操作,所以其中很多方法是可以共用的
数据拉取
数据拉取的主要是通过PullMessageService进行拉取,启动的方法如下
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
我们继续顺着代码往里面看,可以看到最终调用的是方法org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
在该方法中通过调用Broker的api,实现消息的拉取
异步调用的broker,并返回Response后,会触发Consumer会实现的PullCallback的函数
PullCallback有成功和异常的处理逻辑
成功后会把消息存储在本地队列ProcessQueue中(org.apache.rocketmq.client.impl.consumer.ProcessQueue)
它是MessageQueue的快照,会把消息存储到msgTreeMap这个map中(TreeMap使用红黑树实现的有序Map)
最后Consumer通过线程池,并发的拉取ProcessQueue中的数据,进行业务处理