I.场景说明(为何要引入)
- 解耦
>> 降低系统耦合度
2. 异步
>> 提升系统响应速度
3. 削峰
>> 抵抗请求高峰时期
为什么使用消息队列?消息队列有什么优点和缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么优点和缺点?
II.众多消息队列,孰优孰略?(如何选择)
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 十万级,支持高吞吐 | 十万级,支持高吞吐 |
时效性 | ms级 | 微秒级 | ms级 | ms级以内 |
可用性 | 高可用(基于主从架构) | 高可用(基于主从架构) | 高可用 分布式架构 | 高可用,分布式,一个数据多个副本 |
消息可靠性 | 有较低概率丢失 | 基本不丢 | 参数配置可做到0丢失 | 参数配置可以做到0丢失 |
功能支持 | 没经过大规模吞吐量场景的验证,社区也不是很活跃 | 基于 erlang 开发,并发能力很强,性能极好,延时很低,开源,比较稳定的支持,活跃度也高; | 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,社区活跃度很高 |
综上,各种对比之后,有如下建议:
最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃;
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,
几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,
但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司
技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,
绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,
用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,
绝对不会黄,何况几乎是全世界这个领域的事实性规范。
III.引入消息队列可能遇到的问题
-
如何保证消息的高可用?
RabbitMQ 可以开启镜像集群模式,在镜像集群模式下, 你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上, 就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像, 包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候, 都会自动把消息同步到多个实例的 queue 上。
-
如何保证消息重复消费的问题(幂等性)?
首先你的明白重复消费会出现什么问题,为什么要保证幂等性。举个例子: 如果消费者干的事儿是拿一条数据就往数据库里写一条,你可能就把数据在数据库里插入了 2 次, 那么数据就错了。其实重复消费不可怕, 可怕的是你没考虑到重复消费之后,怎么保证幂等性。 解决: 每个消息加一个全局唯一的序号,根据序号判断这条消息是否处理过, 然后再根据自己的业务场景进行处理。或更新或丢弃。
-
如何处理消息丢失问题(消息的可靠性传输)?
- 消息生产者把消息搞丢了: RabbitMQ开启 confirm 模式,如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息, 告诉你说这个消息 ok 了。 如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。 而且你可以结合这个机制自己在内存里维护每个消息id的状态, 如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
- MQ自己搞丢了数据: RabbitMQ可以开启持久化
- 消费端丢失了数据: RabbitMQ默认是自动ack的,也就是说消息到了消费端,就会自动确认已经消费了这条消息, 这时候可能你消费端刚拿到数据,然后挂了,那这条消息不就丢失了。 关闭RabbitMQ的自动确认,每次消费端逻辑处理完的时候, 在程序里确认消费完成,通知MQ,这样就保证了在消费端不会丢失了
-
如何保证消息的顺序性?
- RabbitMQ: 拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点; 或者就一个 queue 但是对应一个 consumer
- Kafka: 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低, 一般不会用这个。 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程, 每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
-
如何处理消息积压的问题?消息过期失效问题?
这种问题出现一般就是消费端出现问题了,导致大量消息积压。 解决办法: 修复consumer,恢复其消费能力,然后等待consumer消费完。 什么?等待太久了? 消息积压到几百万到上千万数据时,那就有点蛋疼了,确实要好几个小时,那就再提供一种解决方案 先修复 consumer 的问题,确保其恢复消费速度,然后新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据, 消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍 数量的 queue。接着临时征用 10 倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。 这种做法相当于是临时将 queue 资源 和 consumer 资源扩大 10 倍,以正常的 10倍速度来消费数据。等快速消费完积压数据之后, 得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息 如何处理消息过期失效问题 消息失效这没得办法了呀,只能怪当初设置失效时间的时候没考虑到这个问题了, 无关紧要的还好,要是重要的类似于订单相关方面的消息,可以写个临时程序, 根据订单号把丢失的相关消息全查出来,补上。