大写的服,看完这篇你还不懂RocketMQ算我输

2,730 阅读16分钟

目录

  1. RocketMQ介绍
  2. RocketMQ概念
  3. 为什么要用RocketMQ?
    1. 异步解耦
    2. 削峰填谷
    3. 分布式事务最终一致性
    4. 数据分发
  4. RocketMQ架构
  5. RocketMQ消息类型
    1. 普通消息
    2. 顺序消息
    3. 定时消息
    4. 事务消息
  6. 最佳实践
    1. 消息重试
    2. 消息过滤
    3. 消费模式
    4. 消费幂等
  7. 本地事务消息封装
  8. 参考代码

RocketMQ 介绍

Apache RocketMQ 是一款 低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

RocketMQ 概念

  • Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。
  • 生产者:负责生产消息并发送消息到 Topic 的角色。
  • 消费者:负责从 Topic 接收并消费消息 的角色。
  • 消息:生产者向 Topic 发送的内容,会被消费者消费。
  • 消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Message Key 和 Tag 等。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

为什么要使用 RocketMQ?

异步解耦

随着微服务架构的流行,服务之间的关系梳理非常重要。异步解耦可以降低服务之间的耦合程度,同时也能提高服务的吞吐量。

使用异步解耦的业务场景非常多,因为每个行业的业务都会不太一样,以一些比较通用的业务来说明相信大家都能理解。

比如电商行业的下单业务场景,以最简单的下单流程来说,下单流程如下:

  1. 锁库存
  2. 创建订单
  3. 用户支付
  4. 扣减库存
  5. 给用户发送购买短信通知
  6. 给用户增加积分
  7. 通知商家发货

我们以下单成功后,用户进行支付,支付完成会有个逻辑叫支付回调,在回调里面需要去做一些业务逻辑。首先来看下同步处理需要花费的时间,如下图:

图片

上面的下单流程从 3 到 5 都是可以采用异步流程进行处理,对于用户来说,支付完成后他就不需要关注后面的流程了。后台慢慢处理就行了,这样就能简化三个步骤,提高回调的处理时间。

图片

削峰填谷

削峰填谷指的是在大流量的冲击下,利用 RocketMQ 可以抗住瞬时的大流量,保护系统的稳定性,提升用户体验。

在电商行业,最常见的流量冲击就是秒杀活动了,利用 RocketMQ 来实现一个完整的秒杀业务还是与很多需要做的工作,不在本文的范围内,后面有机会可以单独跟大家聊聊。想告诉大家的是像诸如此类的场景可以利用 RocketMQ 来扛住高并发,前提是业务场景支持异步处理

图片

分布式事务最终一致性

众所周知,分布式事务有 2PC,TCC,最终一致性等方案。其中使用消息队列来做最终一致性方案是比较常用的。

在电商的业务场景中,交易相关的核心业务一定要确保数据的一致性。通过引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

数据分发

数据分发指的是可以将原始数据分发到多个需要使用这份数据的系统中,实现数据异构的需求。最常见的有将数据分发到 ES, Redis 中为业务提供搜索,缓存等服务。

除了手动通过消息机制进行数据分发,还可以订阅 Mysql 的 binlog 来分发,在分发这个场景,需要使用 RocketMQ 的顺序消息来保证数据的一致性。

图片

RocketMQ 架构

图片

图片来源阿里云官方文档

  • Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。就是一个注册中心。
  • Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
  • 生产者:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
  • 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

RocketMQ 消息类型

RocketMQ 支持丰富的消息类型,可以满足多场景的业务需求。不同的消息有不同的应用场景,下面为大家介绍常用的四种消息类型。

普通消息

普通消息是指 RocketMQ 中无特性的消息。当没有特殊的业务场景,使用普通消息就够了。如果有特殊的场景,就可以使用特殊的消息类型,比如顺序,事务等。

同步发送

同步发送:消息发送方发送出去一条消息,会同步得到服务端返回的结果。

异步发送

异步发送:消息发送方发出去一条消息,不用等待服务端返回结果,可以接着发送下一条消息。发送方可以通过回调接口接收服务端响应,并处理响应结果。

单向发送

单向发送:消息发送方只负责发送消息,发送出去后就不管了,这种方式发送速度非常快,存在丢失消息的风险。

顺序消息

顺序消息是指生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被消费者接收到。

比如数据分发的场景,如果我们订阅了 Mysql 的 binlog 来进行数据异构。消息要是没有顺序,就会出现数据错乱问题。

比如新增一条 id=1 的数据,然后马上删除。这样就产生了两条消息。正常的消费顺序是先新增,然后删除,此时数据是没有的。如果消息没有顺序,删除的先被消费了,然后消费新增的,此时数据还在,没被删除掉,就会导致不一致。

定时消息

定时消息是指消息具备定时发送的功能,当消息发送到服务端后,不会立即投递给消费者。而是要等到消息指定的时间后才会投递给消费者进行消费。

延迟消息也就是定时消息,定时消息是定在某个时间点进行发送,比如 2020-11-11 12:00:00 发送。

延迟消息一般是在当前发送时间的基础上延迟多久进行发送,比如当前时间是 2020-09-10 12:00:00,延迟 10 分钟,那么消息发送成功后将在 2020-09-10 12:10:00 进行投递给消费者。

定时消息可以在订单超时未支付自动取消等场景使用。

事务消息

RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

交互流程:

图片

图片来源阿里云官方文档

  1. 发送方首先发送半事务消息到 RocketMQ 服务端。

  2. RocketMQ 服务端接收到消息,然后将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息,不会投递给消费方。

  3. 收到半事务消息的 Ack 后,发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向服务端提交二次确认,如果本地事务执行成则进行消息的 Commit,如果执行失败则进行消息的 Rollback,服务端收到 Commit 状态则将半事务消息标记为可投递,消费方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,消费方将不会收到该消息。

  5. 如果出现意外情况,步骤 4 没有进行消息的二次确认,等待固定时间后服务端将对该消息发起消息回查。

  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

最佳实践

消息重试

消息在消费方消费失败后,RocketMQ 服务端会重新进行消息的投递,知道消费者成功消费消息,当然重试有次数限制,默认 16 次。

消息重试在一定程度上保证了消息不丢失,通过重试来达到最终被消费的目的。需要注意的是消费者在消费的时候一定要等本地业务成功后才能进行 ACK(消费确认),不然就会出现消费失败,但是已经 ACK,消息将不会重复投递。

如果采取异步消费的方式,需要进行异步转同步,等异步操作完才进行 ACK,具体可以参考我之前写的一篇文章mp.weixin.qq.com/s/Bbh1GDpmk…

最后需要做好对应的监控,如果重试了 4,5 次还是失败的,基本上后面重试也是失败的。这个时候需要让开发人员知道,该人工处理的就人工介入。或者直接监控死信队列。

消息过滤

消息主题,一般用于一类消息的统一分类。比如订单主题,但是订单下的消息会分为很多种。比如创建订单,取消订单等。

不同类型的消息有不同的业务处理,我们可以统一定义消息格式,然后通过一个字段去区分消息类型来做不同的业务逻辑。不好的点在于所有消息都会推送到消费方,不能按需消费。

在 RocketMQ 中可以给消息指定 tag,通过 tag 来区分消息类型。消费者可以根据 Tag 在 RocketMQ 服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。

我曾经遇到过一个 tag 没有正确使用的方式,只有一个 MQ 实例,用 tag 来区分环境。所有消息都在一个主题中,测试环境消费测试环境的 tag,线上消费线上的 tag。

这种方式的问题在于消息没做隔离,线上线下的消息都在一起。另一个就是 tag 被固定成了环境的区分,无法用于消息类型场景,导致只能建多个 topic 来承载多个业务消息类型。

图片

消费模式

RocketMQ 消费模式有两种,集群消费和广播消费。

集群消费:

图片

消费者部署了多个实例我们称之为一个集群,集群消费只会被其中的某一个实例进行消费。

适合大部分的业务场景,大部分的场景我们的消息只允许被消费一次,而且只能有一个消费者去消费,比如支付回调场景,如果一个消息被多个实例同时消费,那么就会出现同时去修改订单状态,同时去扣减库存的情况。

广播消费:

图片

广播消费会让集群中每个实例都消费一次。

比如我们使用了本地缓存,当数据变更的时候,我们需要刷新每个节点本地的缓存,所以每个节点都需要收到消息。

消费幂等

幂等问题,无论是在 API 请求场景还是在消息消费场景,都会遇到。一条消息不能重复消费多次这个肯定是要保证的,因为我们不能保证消息发送方不发送多次,也不能保证消息不重复投递。

RocketMQ 的 Exactly-Once 投递语义,就是用于解决幂等问题。Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

最佳的幂等处理方式还是需要有一个唯一的业务标识,虽然每条消息都有 MessageId,但是不建议用 MessageId 来做幂等判断,在发送消息的时候,可以为每条消息设置一个 MessageKey,这个 MessageKey 就可以用来做业务的唯一标识。

关于幂等怎么处理,就不细讲了。可以参考我之前写的一篇文章mp.weixin.qq.com/s/9fhqnbeXP…,通用的幂等实现方案。

图片

本地事务消息封装

上面介绍了事务消息,RocketMQ 的事务消息采用了二阶段提交的方式。并且结合了消息反查的机制来确保最终一致性。

从使用层面来说,每个业务场景都要去实现一个反查的逻辑,有点烦。

下面介绍另一种经常被使用的方式,就是本地事务消息。本地消息表这个方案最初是 ebay 提出的,本地事务消息需要在服务对应的数据库中创建一个消息表,发送消息的时候不是真正的将消息发送给 MQ,而是往消息表中插入一条消息数据。

插入的动作跟本地的业务逻辑是同一个事务,如果本地事务执行成功,消息才会落表成功,才会发送给 MQ, 本地事务失败,消息数据回滚。

然后需要有一个专门的程序去拉取消息表中未发送的消息投递给 MQ,如果投递失败,可以一直重试,直到成功或者人工介入。

图片

消息写到消息表,然后会一直给 MQ 发送,这个步骤没问题。如果 MQ 收到消息后,消息还在 PageCache 中的时候,Broker 宕机了,这个时候是会出现消息丢失。当然你也可以使用同步刷盘等方式来避免丢失。假如我们就是异步刷盘,有办法保证消息不丢失吗?

前面我们提到,RocketMQ 的事务消息会有回查的机制,消息表的方式,也需要有一个机制来保证消息被消费了,否则就需要不断的重试去发送消息,直到消息被消费。

在消息表中需要有一个字段来标识当前这条消息的状态,比如 未发送,已发送,已消费。当消息还是未发送的时候就会被发送到 MQ, 如果发送成功了,状态就是已发送。但是过了几分钟,状态还是已发送,这个时候就要去做一些动作了。

这个场景下,有可能是消费者跟不上生产的速度,消息堆积了,导致消息一直没被消费。另一种可能就是消息是不是丢失了?

可以获取对应的消息堆积数据来判断是否消息堆积了,如果不是就重新发送消息给 MQ,知道消息被消费。

问题是消息被消费了,我怎么知道?

像我是用的云服务,是有对应的 Open API 可以直接查询消息轨迹。开源的应该也有,没有仔细去研究,跟商业版应该差不多。

根据消息轨迹就可以知道消息有没有被消费,到此为止流程结束。消息发送给 MQ 如果失败会重试,消息如果长时间没消费,也会重新发送,即使最后进入了死信队列,也可以通过死信队列的监控来人工干预,一定会是最终一致性。

跟自带的事务消息比,本地消息表的方式不需要实现回查逻辑,但是要增加消息表,同时也要配套各种发送,检查等逻辑,也挺麻烦了。特别是当消息量大的时候,如何快速的将消息表中的消息发送出去,也需要做很多处理,简单的查表轮询在量大的情况下不太适用。

两种方式都可以使用,能实现我们要的目的即可。

参考代码

本地事务消息相关的代码可以微信搜索「猿天地」,回复关键字「kitty」获取即可。

码字不易,可以的话来个三连击,感谢!

关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号猿天地发起人。

我整理了一份很全的学习资料,感兴趣的可以微信搜索「猿天地」,回复关键字 「学习资料」获取我整理好了的 Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC 分库分表,任务调度框架 XXL-JOB,MongoDB,爬虫等相关资料。