个人吐血系列-总结RocketMQ

3,933 阅读9分钟

一般面试问消息队列,都是结合自己的项目进行回答的...最好有个项目有消息队列的中间件.本项目使用了RocketMQ

大纲

大纲
大纲

什么是消息队列?消息队列的主要作用是什么?

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性

  • 异步处理:非核心流程异步化,提高系统响应性能
  • 应用解耦
    • 系统不是强耦合,消息接受者可以随意增加,而不需要修改消息发送者的代码。消息发送者的成功不依赖消息接受者(比如有些银行接口不稳定,但调用方并不需要依赖这些接口)
    • 消息发送者的成功不依赖消息接受者(比如有些银行接口不稳定,但调用方并不需要依赖这些接口)
  • 最终一致性:最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。
    • 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制实现消息可靠发送接收、业务操作的可靠执行,要注意消息重复与幂等设计
    • 所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。
  • 广播:只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情
  • 流量削峰与监控:当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
  • 日志处理:将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题
  • 消息通讯:消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,如实现点对点消息队列或者聊天室等。

推荐浅显易懂的讲解

kafka、activemq、rabbitmq、rocketmq都有什么区别?

  • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
  • kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

MQ在高并发情况下,假设队列满了如何防止消息丢失?

  1. 生产者可以采用重试机制。因为消费者会不停的消费消息,可以重试将消息放入队列。
  2. 死信队列,可以理解为备胎(推荐)
    • 即在消息过期,队列满了,消息被拒绝的时候,都可以扔给死信队列。
    • 如果出现死信队列和普通队列都满的情况,此时考虑消费者消费能力不足,可以对消费者开多线程进行处理。

谈谈死信队列

死信队列用于处理无法被正常消费的消息,即死信消息

当一条消息初次消费失败,消息队列 RocketMQ 版会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 版不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列

死信消息的特点

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列的特点

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 版不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

消息队列 RocketMQ 版控制台提供对死信消息的查询、导出和重发的功能。

消费者消费消息,如何保证MQ幂等性?

幂等性

消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

解决方案

  • MQ消费者的幂等行的解决一般使用全局ID 或者写个唯一标识比如时间戳 或者UUID 或者订单
  • 也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
  • 给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

使用异步消息时如何保证数据的一致性

  1. 借助数据库的事务:使用异步消息怎么还能借助到数据库事务?这需要在数据库中创建一个本地消息表,这样可以通过一个事务来控制本地业务逻辑更新和本地消息表的写入在同一个事务中,一旦消息落库失败,则直接全部回滚。如果消息落库成功,后续就可以根据情况基于本地数据库中的消息数据对消息进行重投了。关于本地消息表和消息队列中状态如何保持一致,可以采用 2PC 的方式。在发消息之前落库,然后发消息,在得到同步结果或者消息回调的时候更新本地数据库表中消息状态。然后只需要通过定时轮询的方式对状态未已记录但是未发送的消息重新投递就行了。但是这种方案有个前提,就是要求消息的消费者做好幂等控制,这个其实异步消息的消费者一般都需要考虑的。
  2. 除了使用数据库以外,还可以使用 Redis 等缓存。这样就是无法利用关系型数据库自带的事务回滚了。

RockMQ不适用Zookeeper作为注册中心的原因,以及自制的NameServer优缺点?

  1. ZooKeeper 作为支持顺序一致性的中间件,在某些情况下,它为了满足一致性,会丢失一定时间内的可用性,RocketMQ 需要注册中心只是为了发现组件地址,在某些情况下,RocketMQ 的注册中心可以出现数据不一致性,这同时也是 NameServer 的缺点,因为 NameServer 集群间互不通信,它们之间的注册信息可能会不一致
  2. 另外,当有新的服务器加入时,NameServer 并不会立马通知到 Produer,而是由 Produer 定时去请求 NameServer 获取最新的 Broker/Consumer 信息(这种情况是通过 Producer 发送消息时,负载均衡解决)
  3. 包括组件通信间使用 Netty 的自定义协议
  4. 消息重试负载均衡策略(具体参考 Dubbo 负载均衡策略)
  5. 消息过滤器(Producer 发送消息到 Broker,Broker 存储消息信息,Consumer 消费时请求 Broker 端从磁盘文件查询消息文件时,在 Broker 端就使用过滤服务器进行过滤)
  6. Broker 同步双写和异步双写中 Master 和 Slave 的交互

创作不易哇,觉得有帮助的话,给个小小的star呗。 github.com/DreamCats/J…

小Demo项目结合了RocketMQ消息队列:github.com/DreamCats/s…