阅读 700

拍拍贷消息系统原理与应用

前言

在5月12日的Java开发者大会上,除了我本人进行分享之外,还有其他5位优秀的老师也有精彩的分享。

今天我将根据自己的记忆来给大家分享下 拍拍贷 基础框架研发资深专家 李乘胜老师的演讲内容。

李乘胜老师演讲的主题是:拍拍贷消息系统原理与应用,也就是拍拍贷内部使用的消息系统,自研发的,没有用市场上开源的。

我们都知道,对于大厂来说,是有绝对的自研发的技术实力。自研发往往能更贴近公司实际的业务使用场景,所以很多大厂都选择了自研。

有一个好消息就是后面这套消息系统后面会开源,我们又多了一个非常优秀的消息系统。但是在目前为止还没有对应的文章来介绍这套消息系统,所以今天的文章大家要认真阅读哦!

内容有点偏多,不会全部写出来,感兴趣的朋友可以加我微信 jihuan900 我发原始的PPT给你参考学习。

介绍

拍拍贷消息系统是拍拍贷中间件团队,在广泛调研业界开源消息系统的基础上,结合公司现状和自身实践,研发的一款轻量级消息系统。具有丰富的功能和完善的治理

对于框架的选型我们最在意的就是它的性能怎么样?有没有被大规模使用?

  • 支撑拍拍贷每天高峰27+亿消息量、1000多G的消息数据量。
  • 日常高峰2万Tps写入。

消息系统你最在意的问题有哪些

  1. 消息发送慢怎么办? 有监控功能,发现问题后可以扩容Broker或者扩容队列
  2. 消息堆积了,如何快速知晓与处理? 这个可以根据客户端的监控查看消费耗时和机器负载来觉定是否加队列,加线程数,或者加消费实例
  3. 失败消息如何处理? 独立的失败队列,重试
  4. 如何保障消息的高可靠? 数据库保证

上面4点是李老师PPT上介绍的,我在这边补充几点:

  • 如何保证消息的有序性? 目前消息存储在Mysql中,id是自增长,消费的时候从小到大消费,只能保证单队列有序性
  • 如何实现延迟消息? 可以在后台配置延迟多久消费,文章后面会进行讲解
  • 能否支持消息回溯? 后台修改偏移量
  • 部署维护是否方便? 这个部署运维简单,分库分表不用自己考虑,portal有部署脚本

上面是我们在选用消息系统的时候通常都会去考虑的问题,目前市面上最常见的有Kafka、RabbitMQ、RocketMQ 等,那么拍拍贷的消息系统跟这些开源的区别在哪呢?下面我们一起来了解下这款消息系统的架构设计。

架构设计

架构设计

拍拍贷消息系统的设计还是简洁易懂的,消息存储直接用了数据库来实现,不用考虑存储这块的复杂性。

发布消息使用Http协议,支持所有语言,消息订阅也是Http协议,目前采取的是主动拉取消息的方式,批量拉取。

Broker 是无状态的,可以搭建集群,水平扩展,无单点问题

在我的书《Spring Cloud微服务-全栈技术与案例解析》中也有对消息可靠性的介绍,大概的思路跟今天介绍的消息系统差不多,都是消息先落地到数据库中。

我的是将数据库中的消息投递到消息队列,通过消息队列来消费,消费完之后手动确认消费加重试来保证可靠性,当然在里面也可以做很多治理的功能。我这样做主要是对于生产方来说屏蔽了队列,封装成了消息服务,但是对于消费方来说还是要关注队列的存在。

拍拍贷这套就完全抛弃了第三方的消息队列,消费消息也是自己开发的,拉取模式。

消息复用与偏移

每个消费方会记录自己的偏移量,后台还可以动态修改偏移量来达到消息回溯的功能。

功能点

动态启停消费

动态调整偏移

支持延时消息,支持多线程消费,动态调整

消息消费情况查看

指定失败消息重新消费

监控治理

在监控这块,我们最关心的有以下几点:

  • 消费失败告警
  • 消息堆积告警
  • Topic消息报表, 每天的消费情况,性能等
  • 发送效率慢,怎么排查?

对于一些告警,该消息系统都支持了,而且有非常多丰富的报表功能,监控对接了Cat,排查问题非常方便。

客户端发送

客户端发送消息非常简单,有现成封装好了的SDK

MqClient.publish("Test1", "", new ProducerDataDto(“111111"));
复制代码

客户端消费

配置订阅

<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
   <consumer groupName="Test1Sub">
      <topics>
         <topic name="Test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
     </topics>
   </consumer>
</messageQueue>
复制代码

代码订阅

ConsumerGroupVo consumerGroup = new ConsumerGroupVo("Test1Sub");
ConsumerGroupTopicVo topicVo = new ConsumerGroupTopicVo();
topicVo.setName("Test");
topicVo.setSubscriber(new ISubscriber() {
    @Override
    public List<Long> onMessageReceived(List<MessageDto> messages) {
        return null;
    }
});
consumerGroup.addTopic(topicVo);
MqClient.registerConsumerGroup(consumerGroup);
复制代码

猿天地

关注下面的标签,发现更多相似文章
评论