开发好几年,你真的懂MQ嘛(RabbitMQ为例)?看完别说难搞哦

598 阅读10分钟

大家平时也有用到一些消息中间件(MQ),但是对其理解可能仅停留在会使用 API 能实现生产消息、消费消息就完事了。

对 MQ 更加深入的问题,可能很多人没怎么思考过。今天以RabbitMQ为例,和大家一起深入了解MQ。

概念

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  • 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布 确认。

  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对 于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。

  • 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节 点出问题的情况下队列仍然可用。

  • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT等等。

  • 多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、 Ruby 等等。

  • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控 和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生 了什么。

  • 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编 写自己的插件。

RabbitMQ 架构

使用场景

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

为什么选择RabbitMQ

现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?

除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;

可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;

高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;

集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;

社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

工作机制

生产者、消费者和代理

在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器;

  • 消费者:消息的接收方,用于处理数据和确认消息;

  • 代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

消息发送原理

首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?

你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

为什么不通过TCP直接发送命令?

对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

你必须知道的Rabbit

想要真正的了解Rabbit有些名词是你必须知道的。

包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

  • **ConnectionFactory(连接管理器):**应用程序与Rabbit之间建立连接的管理器,程序代码中使用;

  • **Channel(信道):**消息推送使用的通道;

  • **Exchange(交换器):**用于接受、分配消息;

  • Queue(队列):用于存储生产者的消息;

  • RoutingKey(路由键):用于把生成者的数据分配到交换器上;

  • BindingKey(绑定键):用于把交换器的消息绑定到队列上;

看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:

关于更多交换器的信息,我们在后面再讲。

消息持久化

Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;

设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;

消息已经到达持久化交换器上;

消息已经到达持久化的队列;

持久化工作原理

Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

持久化的缺点

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

怎么保证 MQ 消息不丢失?

使用了 MQ 之后,还要关心消息丢失的问题。这里我们挑 RabbitMQ 来说明一下吧。

生产者弄丢了数据

RabbitMQ 生产者将数据发送到 RabbitMQ 的时候,可能数据在网络传输中搞丢了,这个时候 RabbitMQ 收不到消息,消息就丢了。

RabbitMQ 提供了两种方式来解决这个问题:

**事务方式:**在生产者发送消息之前,通过`channel.txSelect`开启一个事务,接着发送消息。

如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚`channel.txRollback`,然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务`channel.txCommit`。

但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。

**另外一种方式就是通过 Confirm 机制:**这个 Confirm 模式是在生产者那里设置的,就是每次写消息的时候会分配一个唯一的 ID,然后 RabbitMQ 收到之后会回传一个 ACK,告诉生产者这个消息 OK 了。

如果 RabbitMQ 没有处理到这个消息,那么就回调一个 Nack 的接口,这个时候生产者就可以重发。

事务机制和 Confirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿。

但是 Confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 Confirm 机制的。

RabbitMQ 弄丢了数据

RabbitMQ 集群也会弄丢消息,这个问题在官方文档的教程中也提到过,就是说在消息发送到 RabbitMQ 之后,默认是没有落地磁盘的,万一 RabbitMQ 宕机了,这个时候消息就丢失了。

所以为了解决这个问题,RabbitMQ 提供了一个持久化的机制,消息写入之后会持久化到磁盘。

这样哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。

设置持久化有两个步骤:

第一个是创建 Queue 的时候将其设置为持久化的,这样就可以保证 RabbitMQ 持久化 Queue 的元数据,但是不会持久化 Queue 里的数据。

第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

但是这样一来可能会有人说:万一消息发送到 RabbitMQ 之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了,怎么办?

对于这个问题,其实是配合上面的 Confirm 机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ACK 消息。

万一真的遇到了那种极端的情况,生产者是可以感知到的,此时生产者可以通过重试发送消息给别的 RabbitMQ 节点。

消费端弄丢了数据

RabbitMQ 消费端弄丢了数据的情况是这样的:在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ 就会认为你已经消费成功了,这条数据就丢了。

对于这个问题,要先说明一下 RabbitMQ 消费消息的机制:在消费者收到消息的时候,会发送一个 ACK 给 RabbitMQ,告诉 RabbitMQ 这条消息被消费到了,这样 RabbitMQ 就会把消息删除。

但是默认情况下这个发送 ACK 的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ACK 给 RabbitMQ,所以会出现丢消息的问题。

所以针对这个问题的解决方案就是:关闭 RabbitMQ 消费者的自动提交 ACK,在消费者处理完这条消息之后再手动提交 ACK。

这样即使遇到了上面的情况,RabbitMQ 也不会把这条消息删除,会在你程序重启之后,重新下发这条消息过来。