摘要: 原创出处 www.iocoder.cn/RocketMQ/sp… 「芋道源码」欢迎转载,保留摘要,谢谢!
- RocketMQ 介绍
- RocketMQ 基本使用
- Spring Cloud Stream 介绍
- Spring Cloud Alibaba RocketMQ Binder 实现原理
- Endpoint支持
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
坑已经挖好,就看 Spring Cloud Alibaba 更新这块。
貌似 github.com/spring-clou… 已经可以看到了。
看了下开发者是小马哥,瞬间就放心了~
如下内容,暂时先转载 《Spring Cloud Alibaba RocketMQ Binder 原理》 。
RocketMQ 介绍
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
RocketMQ 基本使用
- 下载 RocketMQ
下载 RocketMQ最新的二进制文件,并解压
解压后的目录结构如下:
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
- 启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
- 发送、接收消息
发送消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …
接收消息:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…
- 关闭 Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
- Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka
的实现 KafkaMessageChannelBinder
,RabbitMQ
的实现 RabbitMessageChannelBinder
以及 RocketMQ
的实现 RocketMQMessageChannelBinder
。
- Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
Figure 1. Spring Cloud Stream
使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:
MessageChannel messageChannel = new DirectChannel();
// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
这段代码所有的消息类都是 spring-messaging
模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。
Spring Cloud Alibaba RocketMQ Binder 实现原理
Figure 2. RocketMQ Binder处理流程
RocketMQ Binder 的核心主要就是这3个类:RocketMQMessageChannelBinder
,RocketMQInboundChannelAdapter
和 RocketMQMessageHandler
。
RocketMQMessageChannelBinder
是个标准的 Binder 实现,其内部构建 RocketMQInboundChannelAdapter
和 RocketMQMessageHandler
。
RocketMQMessageHandler
用于 RocketMQ Producer
的启动以及消息的发送,其内部会根据 spring-messaging
模块内 org.springframework.messaging.Message
消息类,去创建 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message
。
在构造 org.apache.rocketmq.common.message.Message
的过程中会根据 org.springframework.messaging.Message
的 Header 构造成 RocketMQMessageHeaderAccessor
。然后再根据 RocketMQMessageHeaderAccessor
中的一些属性,比如 tags、keys、flag等属性设置到
RocketMQ 的消息类 org.apache.rocketmq.common.message.Message
中。RocketMQMessageHeaderAccessor
中的 Key 可以参考下面的表格进行获取或设置:
配置项 | 含义 |
---|---|
ACKNOWLEDGEMENT | 获取 Acknowledgement |
TAGS | Message Tags |
KEYS | Message Keys |
ORIGINAL_ROCKETMQ_MESSAGE | RocketMQ 消息对象 MessageExt |
DELAY | Message Delay Level |
ROCKETMQ_FLAG | Message Flag |
ROCKETMQ_TRANSACTIONAL_ARG | 事务消息中 LocalTransactionExecuter 中使用的参数 |
ROCKETMQ_SEND_RESULT | 消息发送结果 |
RocketMQInboundChannelAdapter
用于 RocketMQ Consumer
的启动以及消息的接收。其内部还支持 spring-retry 的使用。
在消费消息的时候可以从 Header 中获取 Acknowledgement
并进行一些设置。
比如使用 MessageListenerConcurrently
进行异步消费的时候,可以设置延迟消费:
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}
比如使用 MessageListenerOrderly
进行顺序消费的时候,可以设置延迟消费:
@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}
Provider端支持的配置:
配置项 | 含义 | 默认值 | ||
---|---|---|---|---|
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled |
是否启用consumer | true | ||
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags |
Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 “\ | \ | “ 分割(不填表示不进行tags的过滤,订阅所有消息) | |
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql |
Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高) | |||
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting |
Consumer是否是广播模式 | false | ||
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly |
顺序消费 or 异步消费 | false |
Endpoint支持
在使用Endpoint特性之前需要在 Maven 中添加 spring-boot-starter-actuator
依赖,并在配置中允许 Endpoints 的访问。
- Spring Boot 1.x 中添加配置
management.security.enabled=false
。暴露的 endpoint 路径为/rocketmq_binder
- Spring Boot 2.x 中添加配置
management.endpoints.web.exposure.include=*
。暴露的 endpoint 路径为/actuator/rocketmq-binder
Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。
{
"runtime": {
"lastSend.timestamp": 1542786623915
},
"metrics": {
"scs-rocketmq.consumer.test-topic.totalConsumed": {
"count": 11
},
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
"count": 0
},
"scs-rocketmq.producer.test-topic.totalSentFailures": {
"count": 0
},
"scs-rocketmq.consumer.test-topic.consumedPerSecond": {
"count": 11,
"fifteenMinuteRate": 0.012163847780107841,
"fiveMinuteRate": 0.03614605351360527,
"meanRate": 0.3493213353657594,
"oneMinuteRate": 0.17099243039490175
},
"scs-rocketmq.producer.test-topic.totalSent": {
"count": 5
},
"scs-rocketmq.producer.test-topic.sentPerSecond": {
"count": 5,
"fifteenMinuteRate": 0.005540151995103271,
"fiveMinuteRate": 0.01652854617838251,
"meanRate": 0.10697493212602836,
"oneMinuteRate": 0.07995558537067671
},
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
},
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
注意:要想查看统计数据需要在pom里加上 metrics-core依赖。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息:
{
"warning": "please add metrics-core dependency, we use it for metrics"
}