RocketMQ快速上手

4,369 阅读11分钟

快速上手

本快速上手手册,指的是在本地计算机上部署RocketMQ消息传递系统,并能做基本生产和消费的详细说明。

必要环境

  1. 64位的操作系统
  2. 64位的JDK1.8+
  3. Maven 3.2.x +
  4. Git
  5. 4G以上的存储空间用于代理实例

下载和构建发布版

这里可以下载 4.4.0 正式版的源代码。也可以在这里下载一个二进制版本 现在执行以下的命令来解包4.4.0源版本并构建二进制包组件。

> unzip rocketmq-all-4.4.0-source-release.zip
> cd rocketmq-all-4.4.0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq

mvn -Prelease-all -DskipTests clean install -U命令将会去构建下载的源代码包,然后在distribution目录中生产target目录,构建的包就在这里。

启动Name Server

charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup sh bin/mqnamesrv &
[1] 5273
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup: 忽略输入并把输出追加到'nohup.out'
tail -f ~/logs/rocketmqlogs/namesrv.log
2019-03-31 19:24:10 INFO main - tls.client.keyPath = null
2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null
2019-03-31 19:24:10 INFO main - tls.client.certPath = null
2019-03-31 19:24:10 INFO main - tls.client.authServer = false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created for server
2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started
2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started
2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON
2019-03-31 19:25:11 INFO NSScheduledThread1 - --------------------------------------------------------
2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE: 0

可以看到 The Name Server boot success就可以知道是启动成功了,并且序列化方式是JSON。

启动Broker Server

charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ nohup sh bin/mqbroker -n localhost:9876 &
[1] 5784
nohup: 忽略输入并把输出追加到'nohup.out'
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ tail -f ~/logs/rocketmqlogs/broker.log 
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK

可以看到broker已经成功注册到name server中了 同时,可以通过jps命令来查看服务是否启动成功

charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup

可以看到,NameServer和Broker都已经启动成功了,所以我们就可以进行下一步模拟发送者和消费者了。

Broker

使用编程的方式向topic(TopicTest)中生产消息的时候, Broker发现这个topic是没有的,那么broker默认去创建topic(TopicTest),并配置了默认的配置。 producer中显示的是我本地的局域网的地址(192.168.3.16)

2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /192.168.3.16:47538
2019-03-31 20:37:43 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[192.168.3.16:47538]
2019-03-31 20:37:43 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933]
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes

当创建一个producer的时候可以看到先会创建一个 CLIENT_INNER_PRODUCER producer在groupChannelTable中然后再创建客户端的一个producer 即producer1注册到groupChannelTable中,从中可以看到这个producer中的一些信息。produer关闭shutdown的时候。先关闭客户端的producer1 然后再groupChannelTable从CLIENT_INNER_PRODUCER移除。代理会不定时的向nameserver进行注册。

当客户端创建一个消费者的时候,如下图代理输出的日志,可以看到当消费者的时候,会创建一个订阅组,并创建订阅组的配置信息,然后在新的消费者连接之后,会添加topic到对应的group中,其中有你订阅的时topic(Topic),同时会添加一个重试topic,这个topic是按照%RETRY%消费者group名称命名的,并添加上订阅。同时也会创建一个新的producer,这个producer是CLIENT_INNER_PRODUCER

2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]]] 192.168.3.16:48236
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412]
2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes

NameServer

在没有创建任何的topic的时候时候,namesever中输出的日志可以看到, RoacketMQ中默认创建了许多的topic

2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:48210
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:48210]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, charse-thinkpad QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new broker registered, 192.168.3.16:10911 HAServer: 192.168.3.16:10912
2019-03-31 19:40:12 INFO RemotingExecutorThread_4 - new topic registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

当用客户端进行创建一个topic(TopicTest)的时候,可以看到,在nameserver中,这个topic已经注册上去了。

2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49208
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - new topic registered, TopicTest QueueData [brokerName=charse-thinkpad, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelUnregistered, the channel[127.0.0.1:49208]
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49224
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49224]
2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - new topic registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

当消费者关闭时,会将客户端的消费者consumer1consumerGroupInfo中进行注销。然后将CLIENT_INNER_PRODUCERgroupChannelTable中进行注销。

2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]]] 192.168.3.16:49810
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801]
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: consumer1
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - processRequestWrapper response to /192.168.3.16:49810 failed
java.nio.channels.ClosedChannelException: null
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes

生产消费示例

  • 使用RocketMQ三种方式进行发送消费:可靠的同步方式;可靠的异步方式;单向传递
  • 使用RocketMQ消费消息

1.添加依赖

maven 方式:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.3.0</version>
</dependency>

2.1同步发送消费

可靠的同步传输广泛应用于重要的通知消息,短信通知,短信营销系统等场景。

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                                      "TagA" /* Tag */,
                                      ("Hello RocketMQ " +
                                       i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                                     );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

2.2异步发送消息

异步传输通常用于响应时间敏感的业务场景。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
            final int index = i;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest",
                                      "TagA",
                                      "OrderID188",
                                      "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                                      sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

2.3单向传递

单向传输用于需要中等可靠的情况,如日志收集。

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                                      "TagA" /* Tag */,
                                      ("Hello RocketMQ " +
                                       i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                                     );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
            
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

3.消费消息示例

public class Consumer {
    
    public static void main(String[] args) throws InterruptedException, MQClientException {
        
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        //Launch the consumer instance.
        consumer.start();
        
        System.out.printf("Consumer Started.%n");
    }
}

关于更多关于RocketMQ的使用实例可以查看这里

顺序消息

RocketMQ提供先进先出的顺序消息队列。下面例子中将会显示全局和部分有序消息的发送/接收。

发送消息例子

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

订阅消息例子

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

消息广播

广播是向主的所有订阅者发送消息,如果您希望所有订阅者都收到有关某个主题的消息,则广播是一个不错的选择。

生产例子

ublic class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消费例子

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

定时消息

定时消息与普通消息不同,因为他们在设定的延迟时间之后才会传递。

1.启动消费者以等待传入的订阅消息

public class ScheduledMessageConsumer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // Subscribe topics
         consumer.subscribe("TestTopic", "*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // Launch consumer
         consumer.start();
     }
 }

2.发送定时消息

public class ScheduledMessageProducer {
    
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }
      
        // Shutdown producer after use.
        producer.shutdown();
    }
    
 }

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。 可以观察到,消息被消费的时间将会比它存储的时间晚10s。

批量发送

  • 为什么批量发送?

批量的发送消息可以提高较小消息体传输性能。

  • 使用限制

同一批的消息应该具有: 相同的主题,相同的waitstoremsgok,并且不支持定时消息。另外,每次发送消息体的总大小不应该超过1MB。

  • 如何使用批量发送

如果一次只发送不超过1MB字节的消息,那么批量使用就很容易了。

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}
  • 大数量量拆分为list

批量发送的复杂性只有在发送大批量消息时才会增加,并且可能无法确定批量发送的消息体它是否超越了1MB的大小限制。这个时候你就需要将消息拆分为List。

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

消息过滤器示例

在大多数情况下,标记是一种简单而有用的设计,用于选择所需的消息,例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

上面的实例中,消费者将收到包含TAGA或TAGB或TAGC的消息,但限制是一条消息只能有一个标记,这可能不适用于复杂的场景。在这种情况下,可以使用SQL表达式筛选出消息。

  • 原理

SQL功能可以通过发送消息时输入的属性进行一些计算。在RocketMQ定义的语法下,你可以实现一些有趣的逻辑

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------
  • 语法

RocketMQ只定义了一些基本语法来支持这个特性,你也可以很容易地扩展它。

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

常量类型:

  1. Numeric, like 123, 3.1415;
  2. Character, like ‘abc’, must be made with single quotes;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;
  • 使用限制

只有推送使用者可以通过sql92选择消息,接口如下:

public void subscribe(final String topic, final MessageSelector messageSelector)
  • 生产示例

发送时可以通过方法putUserProperty将属性放入消息中。

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();
  • 消费示例

使用MessageSelector.bySql来消费消息。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
                   
                   consumer.registerMessageListener(new MessageListenerConcurrently() {
                       @Override
                       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                       }
                   });
consumer.start();

开放消息

开放消息,其中包括指定行业指南和消息传递,流式规范,为金融,电子商务,物联网和大数据领域提供通用框架。设计原则是面向云,简单,灵活和独立于语言的异构环境。符合这些规范将使跨所有主要平台和操作系统开发异构消息传递应用程序成为可能。 RocketMQ提供了OpenMessaging 0.1.0-alpha的部分实现,下面的实例中演示了基于OpenMessaging访问RocketMQ。

  • OMSProducer

下面的实例中将展示如何使用RocketMQ发送同步消息,异步消息,单向传输消息。

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
  • OMSPullConsumer

使用OMSPullConsumer从一个特殊的队列中拉取消息。

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
  • OMSPushConsumer

OMSPushConsumer附加到指定队列,并按MessageListenner使用消息。

public class OMSPushConsumer {
	public static void main(String[] args) {
		final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
			.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
		
		final PushConsumer consumer = messagingAccessPoint.
			createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
		
		messagingAccessPoint.startup();
		System.out.printf("MessagingAccessPoint startup OK%n");
		
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			@Override
			public void run() {
				consumer.shutdown();
				messagingAccessPoint.shutdown();
			}
		}));
		
		consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
			@Override
			public void onMessage(final Message message, final ReceivedMessageContext context) {
				System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
				context.ack();
			}
		});
		
	}
}

事务性消息示例

什么是事务性消息?

可以将其视为两阶段提交消息实现,以确保分布式系统中的最终一致性。 事务性消息保证了本地事务的执行和消息的发送可以自执行。

使用限制

  • 事务消息不支持定时和批量操作。
  • 为了避免单个消息被检查多次,导致一半队列消息积压,我们将单个消息检查次数默认限制在15次,用户可以通过更改代理配置中的TransactionCheckMax参数来更改此限制,如果一个消息被检查的次数超过在代理配置的TransactionCheckMax次,代理默认情况下将丢弃该消息同时打印错误日志,用户可以通过重写AbstractTransationCheckListener类来更改此行为。
  • 事务消息将会在transactionTimeout事件内被检查,这个参数在代理配置中可以进行配置。用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来更改此限制。当发送事务性消息时,此参数优先于transactionMsgTimeout参数。
  • 事务消息可能被多次检查或使用。
  • 提交的消息重新放置到用户的目标主题可能会失败。目前,它取决于日志记录。高可用性是由RocketMQ本身的高可用性机制保证的。如果希望确保事务消息没有丢失,并且事务完整性得到了保证,建议使用同步双写,机制。
  • 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。按生产商ID查询客户机的MQ服务器。

事务消息的状态(Transactional status)

  • TransactionStatus.CommitTransaction: 提交事务,这意味着允许消费者使用该消息.
  • TransactionStatus.RollbackTransaction: 回滚事务,这意味着消息将被删除和不允许消费.
  • TransactionStatus.Unknown: 中间状态,这意味着MQ需要核对,以确定状态.

创建事务消息

使用TransactionMQProducer类创建producer客户端,并指定一个惟一的producerGroup,您可以设置一个自定义线程池来处理检查请求。执行本地事务后,你需要回复MQ根据执行结果,和应答状态是在上面的部分中描述。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

消费事务消息

executeLocalTransaction方法用于在发送半消息成功时执行本地事务,它返回前一节中提到的三个事务状态之一。

checkLocalTransaction方法用于检查本地事务状态并响应MQ检查请求,它还返回上一节中提到的三个事务状态之一。

   import ...
   
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
   
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
   
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }

最后欢迎大家点赞、收藏、评论,转发!

欢迎大家关注我的微信公众号!微信搜索:进击的Matrix

欢迎大家关注我的知乎!可乐

微信公众号-进击的Matrix