分布式消息队列实现概要

2,399 阅读7分钟

前言

消息队列允许应用之间通过发消息的方式异步通讯,简单来说,发送者和消费者的生产效率通常是不一致的,那么我们就需要一种抽象模型去解耦,因此这里就可以引入消息队列,将任务暂时写入消息中间件,待消费者慢慢处理。消息中间件目前已经有了很多选择,例如RocketMQ、Kafka、Pulsar等等,Message queue带来很多便利的同时,也引入了一些技术上的复杂性,就像一个黑盒子一样,如果不能理解其原理,如果碰到了问题查起来也很蛋疼,今天我们就来看看如何着手实现一个简单的消息队列

正文

首先我们看看Kafka以及RocketMQ的包结构,看看一个分布式消息队列究竟需要哪些组件
image
image

存储层

消息队列最核心的组件之一就是存储层,消息如何落地、如何读取,这里的技术选型是比较重要的一点,例如RocketMQ以及Kafka都是选择存储到本机,也就是本地文件系统,而Pulsar则是选择存储到分布式文件系统bookKeeper中,当然也有一些选择了分布式KV系统甚至是数据库,例如Redis自身也是支持publish/consume模型的,具体的选择哪一种实现方式只要还是看自己的业务场景,例如如果可靠性要求较高但对性能并不那么敏感的场景可以选择数据库作为存储介质。
选择本地文件系统去实现一个分布式消息队列相对来说是这几种最复杂的,不仅仅需要自己实现文件的IO细节,对于复制、一致性(当出现网络异常或者系统异常宕机时如何根据日志恢复系统的状态)也都需要自己实现,而这每一部分都需要相当一部分精力去研究,我们这次只是先首先一个比较简单的原型,对于这个方案之后有时间会搞。
基于分布式KV的方案相对来说也是不错的方案,性能很不错,而且接口也比较人性化,但是可靠性差了一点,对于类似交易、缓存同步这种对可靠性要求比较高的场景来说不那么使用。
image

基于数据库的方式性能上会有很大的损失,DB的数据结构本质上就不适合去实现消息队列。这次我们选择利用分布式文件系统作为存储介质,泪如HDFS、Apache BookKeeper等,我们分析一下Message queue的场景,单线程写-多线程读,这里需要引入topic分区的概念,一般如果某些topic比较活跃,吞吐量比较高,那么我们可以将消息分区,实现思路一般是将topic再从细粒度切分为子topic,并将每个子topic分布到不同的broker上,从而实现性能的线性提升,也就是说这里的单线程写具体指的是单个分区,多线程读相对来说比较容易理解,而HDFS正好适合这个场景,而且我们也不用去管replica、写分片、刷盘策略等等,减少了很多实现的复杂性,BookKeeper在这方面是不错的选择。
image

客户端API实现

对于使用者而言,接触到的更多的是客户端暴漏的API,而客户端和服务器端Broker也需要一种方式通讯,对于RocketMQ以及Kafka都是选择实现了自定义的协议,消息队列的如果想要达到极高的吞吐量,实现一种高性能的网络通讯框架是相当重要的一环,RocketMQ是基于Netty之上构建的,而Kafka是直接基于NIO实现的,相对来说要复杂一点,如果看过源码的话会有所了解,Kafka客户端提交之后是先放到一个本地队列,然后根据broker、topic、分区信息等合并提交到服务器端,而Pulsar印象中是基于Protocol buffer实现的,这样相对自定义协议很多好处,首先如果协议后期实现过程有变动的话,如何兼容老的协议等这些细节已经由Protocol buffer帮你解决了,另外很重要的一点是,Protocol buffer可以帮你生成各个不同语言的API,如果是自定义协议这个又要费相当的精力去实现。

一致性

对于消息队列的场景,每条消息都是一旦落盘之后,就不再支持更新操作,对于读取也都是顺序读,consumer抓取到的消息也都是已经落盘的或者已经commit的记录,因此一致性在消息队列中相对来说还是比较容易实现的。

高可用

首先就存储层来说,我们的技术选型就已经决定了本质上就是高可用的,因为BookKeeper本身就支持指定复制到几个slave以及ack的机制,例如需要写入到所有的分区才向客户端返回成功,而对于broker端,因为我们的消息队列是存储和计算分离的,也就是说broker本身是无状态的,当producer/consumer连接的broker宕机或者网络超时的断开连接时,可以直接由另一个broker接着提供服务,当然这里还有很多细节问题,但是复杂性相对RocketMQ等已经降低了很多。

消费者进度存储

我们知道消息存在三种语义: at most once、at least once、exactly once,那么消费者offset的存储于同步机制就一定程度上决定了我们具体是什么语义,例如发送端,如果发送失败不重试的话就是 at most once,如果发送失败选择一定次数的重试,那么就是at least once,这里就可能造成消息重复落盘从而造成重复消费,例如说消息实际已经落盘但是发送提交响应的过程出现了网络异常,就会出现这种情况,而exactly once的场景就会比较复杂一点。我们回到offset的场景,RocketMQ以及Kafka默认都是定时去同步当前的消费进度,那么这个消费进度存储到哪里又是一个问题。 RocketMQ的方式是存储到本地文件系统中,Kafka在0.8版本之前是选择存储到了Zookeeper中,后面改成存储到另外一个topic中,那么这两种方式有什么优缺点呢:

  1. 性能/横向扩展: Zookeeper是一个一致性系统,它保留的API也都是基于key/value的格式,ZK本质上是不支持大量写的,同时ZK不支持横向扩展,因为每个节点都会同步所有的transaction 并保持整个数据集,实际上ZK是基于单个日志写并同步复制到其他节点的分布式系统。ZK的吞吐量据我测试差不多1W/S写左右,但是假如说我们有几十上百万个topic,每秒同步一次消费进度,这个时候ZK已经完全不能满足需要,而且并不能横向扩展,只能通过分片的方式解决,而这又引入了一个代理层
  2. 实现的复杂性: 基于本地文件系统性能虽然可观,但是和消息存储同理,需要考虑很多实现的细节

因此我们这里参考Kafka最新的实现,我们选择将消费进度也存储到BookKeeper中,这样就可以支持大量的写,而且支持线性扩展,BK也会将小的log合并存储到一个文件中,避免了性能被一些不活跃的topic所影响。

总结

本文简单讲解了实现一个分布式消息队列所需要考虑的一些方面,例如一致性、高可用、消费语义、通讯模型等等,但实际去写一个Message queue 所需要的远远不止这些,建议先从源码阅读开始,先理清整体的架构、脉络,再去研究细节、看代码,最好将每个项目的源码在IDE中实际的去打断点、调试,一步一步的了解到从发送消息到接收到消息的这一整个过程都发生了什么。