阅读 212

关于RabbitMQ的记忆

消息队列(本文基于RabbitMQ展开)
最近项目中又一次用到了RabbitMQ,结合之前使用过的RabbitMQ经验,以及参考了一些博客,对RabbitMQ进行一次系统的总结,如有理解的不对的地方,还希望大家指出来,共同学习,共同进步。
本文涉及代码地址github.com/smzb/spring…

1.消息队列

1.1什么是消息队列

消息队列(Message Queue,简称MQ),我们可以根据全名来拆分,消息队列是由消息和队列组成的;消息(Message)很好理解,就是在应用之间传递的数据,可以是简单的字符串,也可以是负责的对象等等;关于队列,我们都知道是FIFO先进先出的规则;所以消息队列(Message Queue)可以理解为是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

1.2为什么要用消息队列

关于消息队列的优点,我们需要结合具体的业务场景来选择应用,在我之前做过的项目中主要是用来进行将同步操作变为异步,当然这个操作一定得是一些对业务影响不太大的行为。

异步

我们来想象一下这种场景,调用A系统之后,A系统需要在接口内再去调用B、C、D系统来完成一次操作,这样一来用户的等待时间会变得很长,用户肯定是不会等待的,所以这个时候我们可以将调用B、C、D通过消息队列来完成,用户调用A系统的时候直接返回就行。
我们之前的业务应用场景是基于互联网教育方面,老师在选择具体的班级之后,需要给所有的学生去发送短信和推送,因为短信是需要调用电信的网关来进行发送的,所以会一定的相应时间。所以我们将老师信息、消息内容、接受的学生集合放入到消息队列中,然后再启动多个消费者同时去消费发送。

解耦

A系统产生了一条比较重要的数据,B、C、D系统都需要调用。可能会有E系统也需要,可能D系统又不需要了,管理会很混乱。有了MQ,A将数据存入到消息队列中,其他需要的来获取就行。通过MQ的Pub/Sub发布订阅模式,可以使A系统与其他系统彻底解耦。

削峰

比如A系统存在某个时间段每秒并发为5000次,如果直接怼上去,那么系统是无法承受的,如果将请求先放入MQ之中,A系统每秒中就拉去自己的最大峰值请求数,就不会出现A系统崩溃的情况,但是MQ中会有短暂的消息积压,过了高峰期之后就会恢复正常。

1.3四种消息队列的对比

  • RabbitMQ 单机吞吐量万级、数据延迟在微秒内,延迟性最低、基于主从模式实现HA、基本不丢失数据
  • ActiveMQ 单机吞吐量万级、数据延迟在ms内、基于主从模式实现HA、偶尔会丢失数据
  • Kafka 单机吞吐量十万级、数据延迟在ms内、分布式架构实现HA、大量被应用在日志收集时、经过参数优化可以做到数据0丢失
  • RocketMQ 单机吞吐量十万级、数据延迟在ms内、分布式架构实现HA、经过参数优化可以做到数据0丢失

2.RabbitMQ介绍

2.1 RabbitMQ是什么

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

  • Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Connection 网络连接,比如一个TCP连接。
  • Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  • Broker 表示消息队列服务器实体。

2.2 RabbitMQ的问题

使用RabbitMQ的一个原则就是原本的消息一条也不能多,一条也不能少。多说的就是重复消费,少说的就是消息丢失

消息丢失(可靠性传递)

  • 生产者消息丢失
    生产者还没有将消息发送到RabbitMQ服务器消息就丢失了,这个时候有两种方案,一种是开启RabbitMQ的事务机制,等到消息发送到MQ服务器之后再提交事务,但是这个过程中是同步进行的,所以一般不会使用这种方式;另外一种就是开启confirm机制,是异步进行的。

  • RabbitMQ服务器消息丢失
    针对RabbitMQ服务器的消息丢失,根据RabbitMQ消息的路由过程我们可以大致知道,需要开启queue持久化、Exchange持久化、Messae持久化来保证消息不会丢失,即便RabbitMQ服务器宕机,重启之后消息还是存在的。

  • 消费者消息丢失 RabbitMQ默认是自动ACK的,假如说消费者挂了,但是RabbitMQ认为消费者已经消费了消息,那么就会出现消息丢失,所以我们需要开启消费者手动ACK模式来确保消息不会丢失。

消息重复消费(幂等性)

幂等性:执行多次所产生的影响与一次执行的影响相同。

关于RabbitMQ的消息幂等性,主要在于RabbitMQ的重试机制,因为网络原因或者客户端延迟消费导致重复消费。

解决方案: 使用Redis,系统维护一个全局唯一ID(MessageID),写入队列的时候将ID写入到Redis中进行缓存,消费者进行消费的时候查询Redis中是否有ID存在,如果存在的话,则为第一次消费,消费成功之后将ID删除,如果ID在系统中不存在的话,则提示是重复请求,当然不要忘记给ID设置过期时间。

消息顺序性消费

假设架构模式是一个 queue,多个 consumer。这个时候生产者向 RabbitMQ里发送了三条数据,顺序依次是data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。那么顺序就乱了,所以我们针对顺序性要求比较高的场景,最好使用一个queue对应一个consumer。

消息队列积压或者消费端不消费

  • 积压怎么解决
    紧急扩容:修复consumer的问题,确保恢复消费速度,然后将所有的consumer停掉,紧急征用几台机器做消费者进行消费,等积压的数据处理完之后,恢复原有的部署架构,重新用原有的consumer进行消费。
  • 消息过期丢失
    批量重导:手动把丢失的查出来之后,再灌入到MQ中进行消费

2.3 RabbitMQ的三种架构模式

单机模式

单机模式我们demo测试的时候可以进行,一般生产环境需要集群来保证高可用

普通集群模式

普通集群模式,就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个,我们创建的queue只会放在一个RabbitMQ实例上,但是每个实例都会去同步queue的元数据,(元数据:可以认为是queue的一些配置信息,通过元数据,可以找到queue所在的实例),我们消费的时候,如果连接上了另外一个实例,这个实例会从queue所在的实例上拉去数据过来。


普通集群模式的缺点,MQ集群内会出现大量的数据传输,如果存放queue的实例宕机之后,会出现短暂性的系统不可用,如果每个MQ都做了持久化机制,需要等待MQ重启之后才可以用

镜像集群模式

镜像集群模式的话,创建queue的时候,无论是元数据还是实际data都会存放在多个实例上,也就是说每个RabbitMQ都有这个queue的一个完整镜像,包含着queue的全部数据。

当然了我们可能会服务器的性能开销比较大,消息需要同步到所有的机器上,导致网络带宽压力和消耗很重,而且也没办法去线性的扩展,但这主要是因为RabbitMQ的架构决定了,因为RabbitMQ不是分布式架构的,关于分布式架构的消息队列有kafka、RocketMQ,我们在这里不涉及。

3.Spring Boot集成RabbitMQ(Topic主题模式)

关于RabbitMQ的就不再这里多说,推荐使用docker来进行安装,本文主要结合RabbitMQ的Topic模式来演示

  • 生产者 yml文件配置
spring:
  rabbitmq:
    host: *******
    virtual-host: /
    username: guest
    password: guest
复制代码

config配置

@Configuration
public class MqConfig {
    /**
     * android/ios推送 正常的队列、交换机、路由健
     */
    public final static String queueName="pushQueue";
    public final static String exchangeName="pushExchange";
    public final static String routingName="notice.push.*";
    
     @Bean
    public Queue pushQueue(){
        return new Queue(queueName);
    }


    @Bean
    public Exchange pushExchange(){
        return new TopicExchange(exchangeName);
    }


    @Bean
    public Binding bindingExchangePush(){
        return BindingBuilder.bind(pushQueue()).to(pushExchange()).with(routingName).noargs();
    }
}
复制代码

sender

@RestController
public class PushController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/pushMessage")
    public String pushMessage(){
        rabbitTemplate.convertAndSend(MqConfig.exchangeName,"notice.push.send","您有一条消息,请注意查收");
        return "OK";
    }
}
复制代码
  • 消费者 yml文件配置
spring:
  #配置mq的基本信息
  rabbitmq:
    host: ******
    virtual-host: /
    username: guest
    password: guest
    #开启手动消息确认
    listener:
      simple:
        acknowledge-mode: manual
复制代码

reciever

@Component
public class PushMessageReciever {
    private final Logger logger = LoggerFactory.getLogger(PushMessageReciever.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    @RabbitListener(queues = "pushQueue")
    public void process(Message message, Channel channel) throws Exception{
         System.out.println(message.getBody().toString());
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
复制代码

启动顺序,先启动生产者,再启动消费者

4.RabbitMQ死信队列应用

4.1什么是死信队列

  • 死信队列:DLX,dead-letter-exchange
  • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX ,DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。我们可以监听这个队列中的消息做相应的处理。

4.2消息变成死信的集中情况

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

4.3死信队列的应用

  • 订单超时取消
    其实RabbitMQ是没有延时队列的,就是利用了死信队列的特性来实现延时处理消息的
    在用户下单30分钟之内,如果还没有支付的话,就取消订单,释放商品信息,这个时候就可以在创建订单的时候去声明一个队列,设置ttl时间为30分钟,当消息超过30分钟之后会变为死信消息,会自动将消息转发到死信队列绑定的订单判断交换机中,然后在订单判断消费者中来判断是否支付的具体操作
  • \color{red}{RabbitMQ的消息重试机制--可靠性消费}
    关于重试机制呢,以消费者消费重试机制为例,@RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。 如果Aop使用异常通知拦截获取异常信息的话 , 自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。 利用RabbitMQ实现消息重试和失败处理,实现可靠的消费消费。在消息消费异常时,自动延时将消息重试,当重试超过一定次数后,则列为异常消息,等待后续特殊处理。
  • 生产者 yml文件和上面demo中的一致;配置类如下
Configuration
public class MqConfig {
    /**
     * 推送 正常的队列、交换机、路由健
     */
    public final static String queueName="pushQueue";
    public final static String exchangeName="pushExchange";
    public final static String routingName="notice.push.*";

    /**
     * 推送重试的队列、交换机、路由健(利用死信队列进行间隔重试,指定重试的交换器 放回原来的队列中)
     */
    public final static String retryQueueName="retryPushQueue";
    public final static String retryExchangeName="retryPushExchange";
    public final static String retryRoutingName="notice.retry.myretry";

    /**
     * (超过重试次数之后的消息存放在失败队列中)失败队列、交换机、路由健
     */
    public final static String failQueueName="failPushQueue";
    public final static String failExchangeName="failPushExchange";
    public final static String failRoutingName="notice.fail.*";


    /**
     * 正常
     * @return
     */
    @Bean
    public Queue pushQueue(){
        return new Queue(queueName);
    }


    @Bean
    public Exchange pushExchange(){
        return new TopicExchange(exchangeName);
    }


    @Bean
    public Binding bindingExchangePush(){
        return BindingBuilder.bind(pushQueue()).to(pushExchange()).with(routingName).noargs();
    }


    /**
     * 重试
     *
     * x-dead-letter-exchange 指定将消息发送到哪个交换机
     * x-message-ttl 消息在队列中延迟5s后超时,消息会重新投递到指定的交换机
     * @return
     */
    @Bean
    public Queue retryQueue(){
        Map<String,Object> arg = new HashMap<String,Object>();
        arg.put("x-dead-letter-exchange","pushExchange");
        arg.put("x-dead-letter-routing-key","notice.push.retry");
        arg.put("x-message-ttl",5000);
        return QueueBuilder.durable(retryQueueName).withArguments(arg).build();
    }

    @Bean
    public Exchange retryExchange(){
        return new TopicExchange(retryExchangeName);
    }


    @Bean
    public Binding bindingExchangeRetry(){
        return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(retryRoutingName).noargs();
    }

    /**
     * 失败
     * @return
     */

    @Bean
    public Queue failQueue(){
        return new Queue(failQueueName);
    }

    @Bean
    public Exchange failExchange(){
        return new TopicExchange(failExchangeName);
    }

    @Bean
    public Binding bindingExchangeFail(){
        return BindingBuilder.bind(failQueue()).to(failExchange()).with(failRoutingName).noargs();
    }
}
复制代码
  • 消费者
@Component
public class PushMessageReciever {
    private final Logger logger = LoggerFactory.getLogger(PushMessageReciever.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    @RabbitListener(queues = "pushQueue")
    public void process(Message message, Channel channel) throws Exception{
        try {
            /** 手动抛出异常,测试消息重试 */
//            int i = 1 / 0;
            throw new ArithmeticException() ;

        } catch (Exception e) {

            long retryCount = getRetryCount(message.getMessageProperties());

            if (retryCount >= 3) {
                /** 重试次数超过5次,则将消息发送到失败队列等待特定消费者处理或者人工处理 */
                try {
                    rabbitTemplate.convertAndSend("failPushExchange","notice.fail.myfail", message);
                    logger.info("消费者消费消息在重试3次后依然失败,将消息发送到fail队列,发送消息:" + new String(message.getBody()));
                } catch (Exception e1) {
                    logger.info("消息在发送到fail队列的时候报错:" + e1.getMessage() + ",原始消息:"+ new String(message.getBody()));
                }

            } else {
                try {
                    /** 重试次数不超过3次,则将消息发送到重试队列等待重新被消费(重试队列延迟超时后信息被发送到相应队列重新消费,即延迟消费)*/
                    rabbitTemplate.convertAndSend("retryPushExchange", "notice.retry.myretry", message);
                    logger.info("消费者消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第" + (retryCount+1) + "次重试");
                } catch (Exception e1) {
                    logger.info("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
                }
            }
        } finally {
            /**
             * 无论消费成功还是消费失败,都要手动进行ack,因为即使消费失败了,也已经将消息重新投递到重试队列或者失败队列
             *
             * 如果不进行ack,生产者在超时后会进行消息重发,如果消费者依然不能处理,则会存在死循环
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }


    /**
     * 获取重试次数
     * @param messageProperties
     * @return
     */
    public long getRetryCount(MessageProperties messageProperties) {
        Long retryCount = 0L;
        if (null != messageProperties) {
            List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
            if(deaths != null && deaths.size()>0){
                Map<String, Object> death = (Map<String, Object>)deaths.get(0);
                retryCount = (Long) death.get("count");
            }

        }
        return retryCount;
    }

}
复制代码

启动之后发送消息,消费端打印如下

这个时候我们就可以启动failed队列的消费者来进行处理

参考文章

消息队列之 RabbitMQ

关注下面的标签,发现更多相似文章
评论