聊聊rocketmq的updateConsumeOffsetToBroker

1,182 阅读1分钟

本文主要研究一下rocketmq的updateConsumeOffsetToBroker

updateConsumeOffsetToBroker

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java

public class RemoteBrokerOffsetStore implements OffsetStore {

	//......

    /**
     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
     * here need to be optimized.
     */
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {

            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);

            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }

    //......
}
  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

findBrokerAddressInAdmin

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

	//......

    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<Long, String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if (brokerAddr != null) {
                    found = true;
                    if (MixAll.MASTER_ID == id) {
                        slave = false;
                    } else {
                        slave = true;
                    }
                    break;

                }
            } // end of for
        }

        if (found) {
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }

        return null;
    }

    //......
}
  • findBrokerAddressInAdmin方法首先从brokerAddrTable获取指定brokerName的brokerId及address的map,然后遍历map,对于brokerAddr不为null的标记found为true,标记brokerId为MixAll.MASTER_ID的slave为false,否则为true,最后跳出循环;若found为true则构造FindBrokerResult返回,否则返回null

小结

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通过mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 若返回null,则执行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然后再执行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())获取findBrokerResult
  • 之后对于findBrokerResult不为null的情况构建UpdateConsumerOffsetRequestHeader,然后执行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

doc