RocketMQ源码环境搭建

657 阅读9分钟

启程:

RocketMQ中文资料:(若英文很好可以直接去看英文版本)

虽然以上基于RocketMQ 3版本,也有很好的参考价值。

从官方仓库 github.com/apache/rock… Fork 出属于自己的仓库。

搭建调试环境的过程:

  1. 启动 RocketMQ Namesrv
  2. 启动 RocketMQ Broker
  3. 启动 RocketMQ Producer
  4. 启动 RocketMQ Consumer

本系列使用的 RocketMQ 版本是 4.5.1 。

1.启动RocketMQ NameSrv

打开 org.apache.rocketmq.namesrv.NameServerInstanceTest 单元测试类,参考 #startup() 方法,我们编写 #main(String[] args) 静态方法,代码如下:

// NameServerInstanceTest.java

public static void main(String[] args) throws Exception {
    // NamesrvConfig 配置
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // NettyServerConfig 配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876); // 设置端口
    // 创建 NamesrvController 对象,并启动
    NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
    namesrvController.initialize();
    namesrvController.start();
    // sleep
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

然后,右键运行,RocketMQ Namesrv 就启动完成。输出日志如下:

14:42:23.154 [NettyEventExecutor] INFO  RocketmqRemoting - NettyEventExecutor service started
14:42:23.154 [FileWatchService] INFO  RocketmqCommon - FileWatchService service started

2.启动RockeMQ Broker

打开 org.apache.rocketmq.broker.BrokerControllerTest 单元测试类,参考 #testBrokerRestart() 方法,我们编写 #main(String[] args) 方法,代码如下:

// BrokerControllerTest.java

public static void main(String[] args) throws Exception {
    // 设置版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    // NettyServerConfig 配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(10911);
    // BrokerConfig 配置
    final BrokerConfig brokerConfig = new BrokerConfig();
    brokerConfig.setBrokerName("broker-a");
    brokerConfig.setNamesrvAddr("127.0.0.1:9876");
    // MessageStoreConfig 配置
    final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    messageStoreConfig.setDeleteWhen("04");
    messageStoreConfig.setFileReservedTime(48);
    messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
    messageStoreConfig.setDuplicationEnable(false);
    
    // 创建 BrokerController 对象,并启动
    BrokerController brokerController = new BrokerController(
            brokerConfig, 
            nettyServerConfig, 
            new NettyClientConfig(), 
            messageStoreConfig);
    brokerController.initialize();
    brokerController.start();
    
    System.out.println("启动了");
    // sleep
    Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

brokerControllerTest控制台输出:

启动了

NameServerInstanceTests控制台输出:

14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, HZO-PC10068847 QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, TopicTest QueueData [brokerName=broker-a, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
14:43:38.523 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new topic registered, DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
14:43:38.524 [RemotingExecutorThread_1] INFO  RocketmqNamesrv - new broker registered, 10.111.23.56:10911 HAServer: 10.111.23.56:10912

3.启动 RocketMQ Producer

打开 org.apache.rocketmq.example.quickstart.Producer 示例类,代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        producer.setNamesrvAddr("127.0.0.1:9876");  //<x>
        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

在处添加设置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876") 代码块,指明 Producer 使用的 RocketMQ Namesrv。控制台输出:

SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C103DF, offsetMsgId=0A6F173800002A9F000000003A31CEC2, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277979]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C203E0, offsetMsgId=0A6F173800002A9F000000003A31CF76, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1277981]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C203E1, offsetMsgId=0A6F173800002A9F000000003A31D02A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1277975]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E2, offsetMsgId=0A6F173800002A9F000000003A31D0DE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1277978]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E3, offsetMsgId=0A6F173800002A9F000000003A31D192, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277980]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E4, offsetMsgId=0A6F173800002A9F000000003A31D246, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1277982]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C303E5, offsetMsgId=0A6F173800002A9F000000003A31D2FA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1277976]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C403E6, offsetMsgId=0A6F173800002A9F000000003A31D3AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1277979]
SendResult [sendStatus=SEND_OK, msgId=0A6F1738844418B4AAC2506AF0C403E7, offsetMsgId=0A6F173800002A9F000000003A31D462, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=1277981]
14:46:25.763 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:46:25.764 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[10.111.23.56:10911] result: true

4. 启动 RocketMQ Consumer

打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        consumer.setNamesrvAddr("127.0.0.1:9876"); //<x>
        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

在 处增加 consumer.setNamesrvAddr("127.0.0.1:9876") 代码块,指明 Consumer 使用的 RocketMQ Namesrv。控制台输出:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277974, sysFlag=0, bornTimestamp=1563259585728, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585729, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31CD5A, commitLogOffset=976342362, bodyCRC=1058180698, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0C003DD, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 57], transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277972, sysFlag=0, bornTimestamp=1563259585725, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585725, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31C7BA, commitLogOffset=976340922, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BD03D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]] 
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=1277971, sysFlag=0, bornTimestamp=1563259585723, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585723, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31C4EA, commitLogOffset=976340202, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277977, CONSUME_START_TIME=1563259835201, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BB03D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]] 
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277978, sysFlag=0, bornTimestamp=1563259585727, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585727, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31CBF2, commitLogOffset=976342002, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835210, UNIQ_KEY=0A6F1738844418B4AAC2506AF0BF03DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]] 
ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277970, sysFlag=0, bornTimestamp=1563259585709, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585710, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31B572, commitLogOffset=976336242, bodyCRC=51035196, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835209, UNIQ_KEY=0A6F1738844418B4AAC2506AF0AD03BB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 53], transactionId='null'}]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1277968, sysFlag=0, bornTimestamp=1563259585705, bornHost=/10.111.23.56:58590, storeTimestamp=1563259585705, storeHost=/10.111.23.56:10911, msgId=0A6F173800002A9F000000003A31AFD2, commitLogOffset=976334802, bodyCRC=1948249169, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1277982, CONSUME_START_TIME=1563259835208, UNIQ_KEY=0A6F1738844418B4AAC2506AF0A903B3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 55], transactionId='null'}]]

5.RocketMQ 之 Namesrv 小结:

KVConfigManager:KV配置管理

  • key-value配置管理,增删改查

RouteInfoManager:路由信息管理

  • 注册Broker,提供Broker信息(名字、角色编号、地址、集群名)
  • 注册Topic,提供Topic信息(Topic名、读写权限、队列情况)