RocketMQ消息详解

1,118 阅读5分钟

基本消息示例,发送和消费

发送消息步骤

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址, 用于broker的发现
3.启动producer
4.创建消息对象,指定主题Topic、Tag、消息体
5.发送消息
6.关闭生产者producer

消息消费者步骤

1.创建消费者Consumer,指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
#### 基本发送消息示例
  1. 同步发送消息
// 1. 创建生产者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");

// 2. 设置 NameServer 集群地址, 多个用;号隔开
defaultMQProducer.setNamesrvAddr("192.168.162.129:9876");

// 3. 启动 producer
defaultMQProducer.start();

// 4. 创建消息对象
Message message = new Message("topic1", "tag1", "hello rocketmq".getBytes());
// 4. 发送同步消息
SendResult result = defaultMQProducer.send(message);
System.out.println("result = " + result);

// 5. 如果不再使用,则关闭producer
defaultMQProducer.shutdown();
  1. 异步发送消息,接收回调
//1. 创建producer
DefaultMQProducer mqProducer = new DefaultMQProducer("group1");

//2. 设置nameServer
mqProducer.setNamesrvAddr("192.168.162.129:9876");

//3. 启动producer
mqProducer.start();

// 设置失败重试次数
mqProducer.setRetryTimesWhenSendAsyncFailed(1);

// 创建消息对象
Message message = new Message("topic1", "tag1", "hello async message".getBytes());

//4. 发送异步消息,设置回调,接收结果
mqProducer.send(message, new SendCallback() {

    public void onSuccess(SendResult sendResult) {
        System.out.println("sendResult = " + sendResult);
    }

    public void onException(Throwable throwable) {
        throwable.printStackTrace();
    }
});

Thread.sleep(1000);
// 不能再发送消息完成前关闭
// 如果不再发送消息,关闭Producer实例。
mqProducer.shutdown();
  1. 发送单向消息,如生产者不在乎结果,如发送日志消息
//1. 创建producer
DefaultMQProducer mqProducer = new DefaultMQProducer("group1");

//2. 设置nameServer
mqProducer.setNamesrvAddr("192.168.162.129:9876");

//3. 启动producer
mqProducer.start();

// 创建消息对象
Message message = new Message("topic1", "tag1", "hello oneway message".getBytes());

//4. 发送单向消息
mqProducer.sendOneway(message);

// 如果不再发送消息,关闭Producer实例。
mqProducer.shutdown();

基本 push consumer 消费消息示例

//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group1");

//2.指定Nameserver地址
pushConsumer.setNamesrvAddr("192.168.162.129:9876");
//3.订阅主题Topic和Tag
pushConsumer.subscribe("topic1", "tag1");
//4. 注册消息监听器
/**
 * 参数一: MessageListenerConcurrently, 会使用多线程,并发消费消息
 */
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//      System.out.println("list = " + list);
        for (MessageExt messageExt : list) {
            System.out.println(new java.lang.String(messageExt.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//5.启动消费者consumer
pushConsumer.start();

消费消息模式---负载均衡模式

  1. 消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);

消费消息模式---广播模式

  1. 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);

顺序消息

  1. 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
  2. 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
  3. 发送消息的步骤
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("base-message");

//2.指定Nameserver地址, 用于broker的发现
defaultMQProducer.setNamesrvAddr("192.168.141.129:9876");

//3.启动producer
defaultMQProducer.start();

//5.发送 10条消息
for (int i = 0; i < 10; i++) {

    Message message = new Message("order-topic", "product", ("order message " + i).getBytes());
    /**
     * 参数一: 消息对象
     * 参数二: 队列的子类/队列选择器实现类
     * 参数三: 业务的唯一标,传递给队列选择器的参数,用来判断选择何种队列
     *
     */
    SendResult result = defaultMQProducer.send(message, new MessageQueueSelector() {

        // 队列选择器,我们要在这里根据 args 来返回 同一个 队列
        @Override
        /**
         * 参数一: 该topic下 所有的队列
         * 参数二: 消息对象
         * 参数三: 外面传递的,业务唯一标识符,如订单ID,用来找到相同队列使用
         */
        public MessageQueue select(List<MessageQueue> list, Message message, Object args) {
            // 把需要顺序的消息,利用某种算法,放到同一个队列里
            // 消费时可保证顺序
            // 如:张三 + 订单id 放到同一个队列里,
            // 这里假设10条消息,都是同一个人发的,我们把它丢到第一个队列里
            return list.get(0);
        }
    }, i);
    System.out.println("result = " + result);
}
//6.关闭生产者producer
defaultMQProducer.shutdown();
  1. 消费顺序消息的步骤
//1.创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("async-topic");
//2.指定Nameserver地址
pushConsumer.setNamesrvAddr("192.168.141.129:9876");
//3.订阅主题Topic和Tag
pushConsumer.subscribe("order-topic", "*");
//4. 注册消息监听器
/**
 * 参数一: MessageListenerOrderly
 */
pushConsumer.registerMessageListener(new MessageListenerOrderly() {

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

        for (MessageExt ext : list) {
            String msg = new String(ext.getBody());

            System.out.println("thread: " + Thread.currentThread().getName() + " msg: " + msg);
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
//5.启动消费者consumer
pushConsumer.start();
  1. 消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。

延迟消息

  1. 延迟消息
// 设定延迟时间等级 当前为级别2(延迟5秒)
message.setDelayTimeLevel(2);
  1. 使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

消息过滤

通过Tag过滤消费消息

  1. 通过tag,producer和consumer的tag必须相同,或者可以tag1 || tag2 || tag3 或者可以 通过 * 消费相同主题下的所有tag消息。

通过简单Sql过滤消息

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

事务消息