Java面试之消息队列

1,874 阅读4分钟

1.应用场景

  1. 解耦
  2. 异步
  3. 流量消峰
  4. 日志记录

2.重复消息的解决方案

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

3.有序性

  1. Producer对于需要顺序的消息发送到同一个queue中
  2. Consumer使用MessageListenerOrderly来对消息进行有序消费

4. 如何实现分布式事务

  1. 发送方向 MQ 服务端发送消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

5.push和pull模式

  1. push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
  2. pull模式:客户端不断的轮询请求服务端,来获取新的消息。
  3. 但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。

6. pull方式实现,RocketMQ如何保证消息的实时性呢?

长轮询即是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的 数据,再返回,然后进入循环周期。 客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客 户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求。

7. 消息模式

DefaultMQPushConsumer实现了自动保存offset值以及实现多个consumer的负载均衡。

//设置组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM")

通过groupname将多个consumer组合在一起,那么就会存在一个问题,消息发送到这个组后,消息怎么分配呢? 这个时候,就需要指定消息模式,分别有集群和广播模式。

  • 集群模式
    同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消息的一部分内容, 同 一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到 负载均衡的目的 。
  • 广播模式
    同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消息会 被多次分发,被多个 Consumer消费。
// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

8. 存储机制

8.1 消息数据的存储

在RocketMQ中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ尽可能保证顺序写入,顺序写入的效率比随机写入的效率高很多。
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,CommitLog是真正存储数据的文件, ConsumeQueue是索引文件,存储数据指向到物理文件的配置。

8.2 同步刷盘与异步刷盘

  • 同步刷盘
    在返回写成功状态时,消息已经被写入磁盘 。 具体流程是:消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程 执行完成后唤醒等待的线程,返回消息写成功的状态 。
  • 异步刷盘
    在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
  • broker配置文件中指定刷盘方式
    flushDiskType=ASYNC_FLUSH -- 异步
    flushDiskType=SYNC_FLUSH -- 同步