分布式消息队列 RocketMQ 源码分析 —— 集成 Spring Cloud

2,046 阅读5分钟
原文链接: www.iocoder.cn
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 www.iocoder.cn/RocketMQ/sp… 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

坑已经挖好,就看 Spring Cloud Alibaba 更新这块。

貌似 github.com/spring-clou… 已经可以看到了。

看了下开发者是小马哥,瞬间就放心了~

如下内容,暂时先转载 《Spring Cloud Alibaba RocketMQ Binder 原理》

RocketMQ 介绍

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

RocketMQ 基本使用

  • 下载 RocketMQ

下载 RocketMQ最新的二进制文件,并解压

解压后的目录结构如下:

apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
  • 启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  • 发送、接收消息

发送消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …

接收消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…

  • 关闭 Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

SCSt overview

Figure 1. Spring Cloud Stream

使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:

MessageChannel messageChannel = new DirectChannel();

// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

这段代码所有的消息类都是 spring-messaging 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。

Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。

Spring Cloud Alibaba RocketMQ Binder 实现原理

1543560843558 24525bf4 1d0e 4e10 be5f bdde7127f6e6

Figure 2. RocketMQ Binder处理流程

RocketMQ Binder 的核心主要就是这3个类:RocketMQMessageChannelBinderRocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageChannelBinder 是个标准的 Binder 实现,其内部构建 RocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageHandler 用于 RocketMQ Producer 的启动以及消息的发送,其内部会根据 spring-messaging 模块内 org.springframework.messaging.Message 消息类,去创建 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message

在构造 org.apache.rocketmq.common.message.Message 的过程中会根据 org.springframework.messaging.Message 的 Header 构造成 RocketMQMessageHeaderAccessor。然后再根据 RocketMQMessageHeaderAccessor 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message 中。RocketMQMessageHeaderAccessor 中的 Key 可以参考下面的表格进行获取或设置:

配置项 含义
ACKNOWLEDGEMENT 获取 Acknowledgement
TAGS Message Tags
KEYS Message Keys
ORIGINAL_ROCKETMQ_MESSAGE RocketMQ 消息对象 MessageExt
DELAY Message Delay Level
ROCKETMQ_FLAG Message Flag
ROCKETMQ_TRANSACTIONAL_ARG 事务消息中 LocalTransactionExecuter 中使用的参数
ROCKETMQ_SEND_RESULT 消息发送结果

RocketMQInboundChannelAdapter 用于 RocketMQ Consumer 的启动以及消息的接收。其内部还支持 spring-retry 的使用。

在消费消息的时候可以从 Header 中获取 Acknowledgement 并进行一些设置。

比如使用 MessageListenerConcurrently 进行异步消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
    RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
    Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
    acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
    acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}

比如使用 MessageListenerOrderly 进行顺序消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
    RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
    Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
    acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
    acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}

Provider端支持的配置:

配置项 含义 默认值
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled 是否启用consumer true
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 “\ \ “ 分割(不填表示不进行tags的过滤,订阅所有消息)
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting Consumer是否是广播模式 false
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly 顺序消费 or 异步消费 false

Endpoint支持

在使用Endpoint特性之前需要在 Maven 中添加 spring-boot-starter-actuator 依赖,并在配置中允许 Endpoints 的访问。

  • Spring Boot 1.x 中添加配置 management.security.enabled=false。暴露的 endpoint 路径为 /rocketmq_binder
  • Spring Boot 2.x 中添加配置 management.endpoints.web.exposure.include=*。暴露的 endpoint 路径为 /actuator/rocketmq-binder

Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。

{
    "runtime": {
        "lastSend.timestamp": 1542786623915
    },
    "metrics": {
        "scs-rocketmq.consumer.test-topic.totalConsumed": {
            "count": 11
        },
        "scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
            "count": 0
        },
        "scs-rocketmq.producer.test-topic.totalSentFailures": {
            "count": 0
        },
        "scs-rocketmq.consumer.test-topic.consumedPerSecond": {
            "count": 11,
            "fifteenMinuteRate": 0.012163847780107841,
            "fiveMinuteRate": 0.03614605351360527,
            "meanRate": 0.3493213353657594,
            "oneMinuteRate": 0.17099243039490175
        },
        "scs-rocketmq.producer.test-topic.totalSent": {
            "count": 5
        },
        "scs-rocketmq.producer.test-topic.sentPerSecond": {
            "count": 5,
            "fifteenMinuteRate": 0.005540151995103271,
            "fiveMinuteRate": 0.01652854617838251,
            "meanRate": 0.10697493212602836,
            "oneMinuteRate": 0.07995558537067671
        },
        "scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
            "count": 0,
            "fifteenMinuteRate": 0.0,
            "fiveMinuteRate": 0.0,
            "meanRate": 0.0,
            "oneMinuteRate": 0.0
        },
        "scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
            "count": 0,
            "fifteenMinuteRate": 0.0,
            "fiveMinuteRate": 0.0,
            "meanRate": 0.0,
            "oneMinuteRate": 0.0
        }
    }
}

注意:要想查看统计数据需要在pom里加上 metrics-core依赖。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息:

{
    "warning": "please add metrics-core dependency, we use it for metrics"
}