基本消息示例,发送和消费
发送消息步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址, 用于broker的发现
3.启动producer
4.创建消息对象,指定主题Topic、Tag、消息体
5.发送消息
6.关闭生产者producer
消息消费者步骤
1.创建消费者Consumer,指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
#### 基本发送消息示例
- 同步发送消息
// 1. 创建生产者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group1");
// 2. 设置 NameServer 集群地址, 多个用;号隔开
defaultMQProducer.setNamesrvAddr("192.168.162.129:9876");
// 3. 启动 producer
defaultMQProducer.start();
// 4. 创建消息对象
Message message = new Message("topic1", "tag1", "hello rocketmq".getBytes());
// 4. 发送同步消息
SendResult result = defaultMQProducer.send(message);
System.out.println("result = " + result);
// 5. 如果不再使用,则关闭producer
defaultMQProducer.shutdown();
- 异步发送消息,接收回调
//1. 创建producer
DefaultMQProducer mqProducer = new DefaultMQProducer("group1");
//2. 设置nameServer
mqProducer.setNamesrvAddr("192.168.162.129:9876");
//3. 启动producer
mqProducer.start();
// 设置失败重试次数
mqProducer.setRetryTimesWhenSendAsyncFailed(1);
// 创建消息对象
Message message = new Message("topic1", "tag1", "hello async message".getBytes());
//4. 发送异步消息,设置回调,接收结果
mqProducer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("sendResult = " + sendResult);
}
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
Thread.sleep(1000);
// 不能再发送消息完成前关闭
// 如果不再发送消息,关闭Producer实例。
mqProducer.shutdown();
- 发送单向消息,如生产者不在乎结果,如发送日志消息
//1. 创建producer
DefaultMQProducer mqProducer = new DefaultMQProducer("group1");
//2. 设置nameServer
mqProducer.setNamesrvAddr("192.168.162.129:9876");
//3. 启动producer
mqProducer.start();
// 创建消息对象
Message message = new Message("topic1", "tag1", "hello oneway message".getBytes());
//4. 发送单向消息
mqProducer.sendOneway(message);
// 如果不再发送消息,关闭Producer实例。
mqProducer.shutdown();
基本 push consumer 消费消息示例
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
pushConsumer.setNamesrvAddr("192.168.162.129:9876");
//3.订阅主题Topic和Tag
pushConsumer.subscribe("topic1", "tag1");
//4. 注册消息监听器
/**
* 参数一: MessageListenerConcurrently, 会使用多线程,并发消费消息
*/
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// System.out.println("list = " + list);
for (MessageExt messageExt : list) {
System.out.println(new java.lang.String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
pushConsumer.start();
消费消息模式---负载均衡模式
- 消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
消费消息模式---广播模式
- 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
顺序消息
- 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
- 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
- 发送消息的步骤
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("base-message");
//2.指定Nameserver地址, 用于broker的发现
defaultMQProducer.setNamesrvAddr("192.168.141.129:9876");
//3.启动producer
defaultMQProducer.start();
//5.发送 10条消息
for (int i = 0; i < 10; i++) {
Message message = new Message("order-topic", "product", ("order message " + i).getBytes());
/**
* 参数一: 消息对象
* 参数二: 队列的子类/队列选择器实现类
* 参数三: 业务的唯一标,传递给队列选择器的参数,用来判断选择何种队列
*
*/
SendResult result = defaultMQProducer.send(message, new MessageQueueSelector() {
// 队列选择器,我们要在这里根据 args 来返回 同一个 队列
@Override
/**
* 参数一: 该topic下 所有的队列
* 参数二: 消息对象
* 参数三: 外面传递的,业务唯一标识符,如订单ID,用来找到相同队列使用
*/
public MessageQueue select(List<MessageQueue> list, Message message, Object args) {
// 把需要顺序的消息,利用某种算法,放到同一个队列里
// 消费时可保证顺序
// 如:张三 + 订单id 放到同一个队列里,
// 这里假设10条消息,都是同一个人发的,我们把它丢到第一个队列里
return list.get(0);
}
}, i);
System.out.println("result = " + result);
}
//6.关闭生产者producer
defaultMQProducer.shutdown();
- 消费顺序消息的步骤
//1.创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("async-topic");
//2.指定Nameserver地址
pushConsumer.setNamesrvAddr("192.168.141.129:9876");
//3.订阅主题Topic和Tag
pushConsumer.subscribe("order-topic", "*");
//4. 注册消息监听器
/**
* 参数一: MessageListenerOrderly
*/
pushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt ext : list) {
String msg = new String(ext.getBody());
System.out.println("thread: " + Thread.currentThread().getName() + " msg: " + msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者consumer
pushConsumer.start();
- 消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。
延迟消息
- 延迟消息
// 设定延迟时间等级 当前为级别2(延迟5秒)
message.setDelayTimeLevel(2);
- 使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
消息过滤
通过Tag过滤消费消息
- 通过tag,producer和consumer的tag必须相同,或者可以
tag1 || tag2 || tag3
或者可以 通过*
消费相同主题下的所有tag消息。
通过简单Sql过滤消息
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)