RabbitMQ详解

3,969 阅读29分钟

前言

消息队列(Message Queue,简称MQ)现在在很多公司都有使用,而MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ,巧了,我们公司使用的是RabbitMq,所以本文主要介绍一下RabbitMq([捂脸]其实是我不会其他MQ,日后学了会再发的),主要从以下几点来讲:

1、RabbitMq的安装
2、SpringBoot项目中RabbitMq的使用 (五种模式)
3、延迟队列
4、RabbitMq使用过程中遇到的问题及解决方案(如消息重试机制,消息重复消费、消息丢失、消息的顺序消费等等)

原创不易,最后希望各位看官爸爸动动手点个赞吧!

正文

Mac下RabbitMq的安装(windows安装点这里


安装Homebrew

1、进入Homebrew官网,复制命令:

2、打开电脑中端,将命令粘贴至中端并按回车键执行,等待安装完成:

注意事项

1、如果出现粘贴命令回车之后出现curl: (7) Failed to connect to raw.githubusercontent.com port 443: Connection refused这样的提示,请连接手机热点再次尝试,如果还不行请点击此处

安装RabbitMq

1、打开终端,输入如下命令:brew install rabbitmq,点击回车键等待安装,看到如下界面即为成功:

启动RabbitMq

1、在安装完成之后在中端输入如下内容即可启动,看到如图4步骤即为启动成功(启动成功之后别关闭中端,不然登陆浏览器是看不到的):

1、cd /usr/local/Cellar/rabbitmq/
2、cd 3.8.3(版本号看你自己的是多少)
3、sbin/rabbitmq-server 

2、登录浏览器 http://localhost:15672 进行查看 初始化默认密码都为guest/guest

添加用户及授权

添加用户

(1)、超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

(2)、监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

(3)、策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

(4)、普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

授权

RabbitMq默认virtual host为 /, 如果想自定义virtual host,请点击这里

SpringBoot项目中RabbitMq的使用


配置项目

添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

修改配置文件

spring:
  rabbitmq:
    host: 自己rabbit的路径
    port: 自己rabbit的端口
    virtual-host: /
    username: 自己rabbit的账号
    password: 自己rabbit的密码
    publisher-confirms: true

项目下新建rabbit包,内容可直接复制粘贴过去

1、增加获取配置文件信息的实体类

@Data
@ToString
@Configuration
public class RabbitProperties {

    /**
     * rabbitmq 服务器地址
     */
    @Value("${spring.rabbitmq.host}")
    private String host;

    /**
     * rabbitmq 服务器端口
     */
    @Value("${spring.rabbitmq.port}")
    private int port;

    /**
     * rabbitmq 账号
     */
    @Value("${spring.rabbitmq.username}")
    private String username;

    /**
     * rabbitmq 密码
     */
    @Value("${spring.rabbitmq.password}")
    private String password;
}

2、增加配置类

@Configuration
public class RabbitConfiguration {

    @Autowired
    private RabbitProperties rabbitProperties;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost(), rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    /**
     * @return
     * @Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)这个是说在每次注入的时候回自动创建一个新的bean实例
     * @Scope(value=ConfigurableBeanFactory.SCOPE_SINGLETON)单例模式,在整个应用中只能创建一个实例
     * @Scope(value=WebApplicationContext.SCOPE_GLOBAL_SESSION)全局session中的一般不常用
     * @Scope(value=WebApplicationContext.SCOPE_APPLICATION)在一个web应用中只创建一个实例
     * @Scope(value=WebApplicationContext.SCOPE_REQUEST)在一个请求中创建一个实例
     * @Scope(value=WebApplicationContext.SCOPE_SESSION)每次创建一个会话中创建一个实例
     * proxyMode=ScopedProxyMode.INTERFACES创建一个JDK代理模式
     * proxyMode=ScopedProxyMode.TARGET_CLASS基于类的代理模式
     * proxyMode=ScopedProxyMode.NO(默认)不进行代理
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        // template.setMandatory(true);
        return template;
    }
}

3、定义队列、交换器等名称(这里有新增了订单相关队列和交换器)

public class RabbitMqKey {

    /**
     * 订单-队列
     */
    public static final String TRADE_ORDER_QUEUE = "trade-order-queue";

    /**
     * 订单-交换器
     */
    public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";

}

4、初始化队列、交换器等并绑定关系

@Component
public class TradeOrderQueueConfig {


    private final static Logger logger = LoggerFactory.getLogger(TradeOrderQueueConfig.class);

    /**
     * 创建队列
     * Queue 可以有4个参数
     * String name: 队列名
     * boolean durable: 持久化消息队列,rabbitmq 重启的时候不需要创建新的队列,默认为 true
     * boolean exclusive: 表示该消息队列是否只在当前的connection生效,默认为 false
     * boolean autoDelete: 表示消息队列在没有使用时将自动被删除,默认为 false
     * Map<String, Object> arguments:
     *
     * @return
     */
    @Bean(name = "queue")
    public Queue queue() {
        logger.info("queue : {}", RabbitMqKey.TRADE_ORDER_QUEUE);
        // 队列持久化
        return new Queue(RabbitMqKey.TRADE_ORDER_QUEUE, true);
    }

    /**
     * 创建一个 Fanout 类型的交换器
     * <p>
     * rabbitmq中,Exchange 有4个类型:Direct,Topic,Fanout,Headers
     * Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中;
     * Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中;
     * Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key;
     * Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中;
     *
     * @return
     */
    @Bean(name = "fanoutExchange")
    public FanoutExchange fanoutExchange() {
        logger.info("exchange : {}", RabbitMqKey.TRADE_ORDER_EXCHANGE);
        return new FanoutExchange(RabbitMqKey.TRADE_ORDER_EXCHANGE);
    }

    /**
     * 把队列(Queue)绑定到交换器(Exchange)
     * topic 使用路由键(routingKey)
     *
     * @return
     */
    @Bean
    Binding fanoutBinding(@Qualifier("queue") Queue queue,
                    @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

5、发送消息类

@Component
public class Sender {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 如果rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
     * 需手动注入
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 订单信息(发送至交换器)
     *
     * @param payload
     * @return
     */
    public String orderSendExchange(Object payload){
        return baseSend(RabbitMqKey.TRADE_ORDER_EXCHANGE, "", payload, null, null);
    }

    /**
     * 订单信息(发送至队列)
     *
     * @param payload
     * @return
     */
    public String orderSendQueue(Object payload){
        return baseSend("", RabbitMqKey.TRADE_ORDER_QUEUE, payload, null, null);
    }

    /**
     * MQ 发送数据基础方法
     *
     * @param exchange  交换器名
     * @param routingKey  队列名
     * @param payload 消息信息
     * @param uniqueMessageId  标示id,不传可自动生成
     * @param messageExpirationTime  持久化时间
     * @return 消息编号
     */
    public String baseSend(String exchange, String routingKey, Object payload, String uniqueMessageId, Long messageExpirationTime) {
        // 生成消息ID
        String finalUniqueMessageId = uniqueMessageId;
        if (StringUtils.isBlank(uniqueMessageId)) {
            uniqueMessageId = UUID.randomUUID().toString();
        }
        logger.info("SEND --- unique message id:{}", uniqueMessageId);

        // 消息属性
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 消息属性中写入消息编号
                message.getMessageProperties().setMessageId(finalUniqueMessageId);
                // 消息持久化时间
                if (!StringUtils.isEmpty(String.valueOf(messageExpirationTime))) {
                    logger.info("设置消息持久化时间:{}", messageExpirationTime);
                    message.getMessageProperties().setExpiration(Long.toString(messageExpirationTime));
                }
                // 设置持久化模式
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
        };

        logger.info("SEND --- messagePostProcessor:{}", messagePostProcessor);

        // 消息
        Message message = null;
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            String json = objectMapper.writeValueAsString(payload);
            logger.info("发送消息:{}", payload.toString());
            // 转换数据格式
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentEncoding(MessageProperties.CONTENT_TYPE_JSON);
            message = new Message(json.getBytes(), messageProperties);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        // correlationData
        CorrelationData correlationData = new CorrelationData(uniqueMessageId);

        /**
         * convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
         * exchange: 路由
         * routingKey: 绑定key
         * message: 发送消息
         * messagePostProcessor: 消息属性处理类
         * correlationData: 对象内部只有一个 id 属性,用来表示当前消息唯一性
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);

        return finalUniqueMessageId;
    }
}

6、确认信息

@Component
public class RabbitAck implements RabbitTemplate.ConfirmCallback {

    private final static Logger logger = LoggerFactory.getLogger(RabbitAck.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("ACK --- MQ message id: {}" + correlationData);
        if (ack) {
            logger.info("ACK --- Message sent confirmation success!");
        } else {
            logger.info("ACK --- MQ message id: {}", correlationData.getId());
            logger.info("ACK --- MQ confirmetion: {}", ack);
            logger.info("ACK --- Message sending confirmation failed, reason for failure:" + cause);
        }
    }
}

实践

1、简单队列

p:消息生产者(就是寄信的人)
c:消息消费者(就是收信的人)
红色格子: 队列(就是邮差,送信的人)

发送至队列的消息只会被一个消费者获取然后消费,要想多个消费者同时接收消费请看后面的内容!

1、发送消息

(1) 新增发送消息接口

@RestController
public class ProducersController {

    @Resource
    private Sender sender;

    @PostMapping("/producers")
    public void producers(){
        sender.orderSendQueue("Hello World");
    }
}

(2)PostMan调用接口

(3)查看项目控制台

(4)查看rabbitmq

2、接收消息

(1)新增接收订单队列信息的listener

@Component
public class OrderQueueListener {

    private static final Logger logger = LoggerFactory.getLogger(OrderQueueListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println(msg);
        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

(2)由于刚才我们已经发送了信息,这里只要启动项目就能接收到信息

2、Work模式

p:消息生产者(就是寄信的人)
c1,c2:消息消费者(就是收信的人)
红色格子: 队列(就是邮差,送信的人)

(1)发送消息,在刚才的controller中添加一个接口,循环发送100条消息

@PostMapping("/batch/producers")
public void batchProducers(){
    for (int i = 0; i < 100; i++){
        sender.orderSendQueue("Hello World" + i);
    }
}

(2)开两个服务,都接收同一个队列的信息

@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务1接收到的数据:" + msg);
        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
@RabbitListener(queues = RabbitMqKey.TRADE_ORDER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务2接收到的数据:" + msg);
        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }

(3)查看两个服务的结果

结果:有多个服务接收同一个队列的信息,MQ默认是用轮询分发的方式发送信息的!如果你想采用公平分发(能者多劳)模式的话,请看下面

(4)修改服务1配置文件

(5)修改服务2配置文件

(6)重新测试,查看结果(看下图可知服务2接收到的消息明显多于服务1收到的消息)

3、订阅模式

p:消息生产者(就是寄信的人)
x:交换器(就是邮局)
c1,c2:消息消费者(就是收信的人)
红色格子: 队列(就是邮差,送信的人)

一封信通过邮局邮给两个人,邮局就会派两个邮差送信给相应的人,这里就是两个不同的队列绑定同一个交换器,确保一条消息两个服务都能同时接收到。

rabbitmq中,Exchange 有4个类型:Direct,Topic,Fanout,Headers
Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中;
Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中;
Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key;
Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中;

Direct Exchange、Topic Exchange进行Binding的时候,需要指定Routing key

Fanout Exchange、Headers Exchange进行Binding的时候,不需要指定Routing key

1、队列绑定交换器

从前文可看到,我已经将订单队列和交换器绑定了(订单交换器的类型是Fanout Exchange)

现在将另一个服务也绑定一下

2、查看绑定关系

3、启动两个服务,发送信息至订单交换器

(1) 新增发送至交换器接口

(2)调用接口,执行发送

(3)查看两个服务结果

如果交换器没有绑定队列的话,那么发送到交换器的消息将会丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

4、路由模式

路由模式跟发布订阅模式类似,然后在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列,就是按照某种规则指定发送到交换器的内容由哪些队列接收,并不一定所有绑定交换器的队列都能接收到。

p:消息生产者(就是寄信的人)
x:direct类型的交换器(就是邮局)
c1,c2:消息消费者(就是收信的人)
红色格子: 队列(就是邮差,送信的人)

上图是一个结合日志消费级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的 direct 模式。

以上图的配置为例,我们以 routingKey="error" 发送消息到 Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…)。如果我们以 routingKey="info" 或 routingKey="warning" 来发送消息,则消息只会路由到 Queue2。如果我们以其他 routingKey 发送消息,则消息不会路由到这两个 Queue 中。

1、新建队列、交换器以及路由并绑定

(1)服务1:

public class RabbitMqKey {

    /**
     * 订单-队列
     */
    public static final String TRADE_ORDER_QUEUE = "trade-order-queue";

    /**
     * 订单-交换器
     */
    public static final String TRADE_ORDER_EXCHANGE = "trade-order-exchange";

    /**
     * 路由测试队列
     */
    public static final String TRADE_DIRECT_TEST_QUEUE = "trade-direct-test-queue";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";

    /**
     * 路由
     */
    public static final String ROUTING_KEY = "ERROR";

}

TradeOrderQueueConfig类中绑定关系:

(2)服务2:

/**
     * 路由测试队列
     */
    public static final String TRADE_DIRECT_TEST_QUEUE_V2 = "trade-direct-test-queue-v2";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_DIRECT_TEST_EXCHANGE = "trade-order-exchange";

    /**
     * 路由
     */
    public static final String ROUTING_KEY = "INFO";

2、查看绑定关系

3、两个服务分别新建DirectListener类,接收队列信息

(1)服务1:

@Component
public class DirectListener {

    private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务1接收到的数据:" + msg);

        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

(2)服务2:

@Component
public class DirectListener {

    private static final Logger logger = LoggerFactory.getLogger(DirectListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_DIRECT_TEST_QUEUE_V2)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务2接收到的数据:" + msg);

        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

4、Sender类增加指定路由发送消息方法,并增加发送消息接口

/**
     * ERROR路由发送消息至交换器
     *
     * @param payload
     * @return
     */
    public String errorSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, RabbitMqKey.ROUTING_KEY, payload, null, null);
    }

    /**
     * INFO路由发送消息至交换器
     *
     * @param payload
     * @return
     */
    public String infoSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_DIRECT_TEST_EXCHANGE, "INFO", payload, null, null);
    }

在ProducersController类中增加两个接口

/**
     * info
     */
    @PostMapping("/send/info")
    public void sendInfo(){
        sender.infoSendQueue("我是info级别的日志,你可以不用管我");
    }

    /**
     * info
     */
    @PostMapping("/send/error")
    public void sendError(){
        sender.errorSendQueue("我是error级别的日志,你也可以不用管我,只要你不怕死");
    }

然后重新启动一下两个服务

(5)调用/send/info接口,查看结果

服务1没有接收到消息:

服务2接收到了消息:

(6)调用/send/error接口,查看结果

服务1:

服务2:

5、主题模式(通配符模式)

p:消息生产者(就是寄信的人)
x:topic类型的交换器(就是邮局)
c1,c2:消息消费者(就是收信的人)
红色格子: 队列(就是邮差,送信的人)

1、新建队列、交换器以及绑定关系

(1)服务1:

/**
     * 路由测试队列
     */
    public static final String TRADE_TOPIC_TEST_QUEUE = "trade-topic-test-queue";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";

    /**
     * 路由
     */
    public static final String TOPIC_ROUTING_KEY = "JAVA.#";

(2) 服务2:

/**
     * 路由测试队列
     */
    public static final String TRADE_TOPIC_TEST_QUEUE_V2 = "trade-topic-test-queue-v2";

    /**
     * 路由测试交换器
     */
    public static final String TRADE_TOPIC_TEST_EXCHANGE = "trade-topic-test-exchange";

    /**
     * 路由
     */
    public static final String TOPIC_ROUTING_KEY = "JAVA.*";

2、查看绑定关系

3、两个服务分别新建TopicListener类,接收队列信息

(1)服务1:

@Component
public class TopicListener {

    private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务1接收到的数据:" + msg);

        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

(2)服务2:

@Component
public class TopicListener {

    private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TRADE_TOPIC_TEST_QUEUE_V2)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("服务2接收到的数据:" + msg);

        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

4、Sender类增加指定路由发送消息方法,并增加发送消息接口

/**
     * 发送消息至topic类型的交换器
     *
     * @param payload
     * @return
     */
    public String topicErrorSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG", payload, null, null);
    }

    /**
     * 发送消息至topic类型的交换器
     *
     * @param payload
     * @return
     */
    public String topicInfoSendQueue(Object payload){
        return baseSend(RabbitMqKey.TRADE_TOPIC_TEST_EXCHANGE, "JAVA.LOG.ERROR", payload, null, null);
    }

在ProducersController类中增加两个接口

/**
     * 发送信息
     */
    @PostMapping("/send/java")
    public void sendJava(){
        sender.topicErrorSendQueue("JAVA.*:匹配不多不少一个词 JAVA.#:匹配一个或多个词");
    }

    /**
     * 发送信息
     */
    @PostMapping("/send/java/error")
    public void sendJavaError(){
        sender.topicInfoSendQueue("JAVA.*:匹配不多不少一个词 JAVA.#:匹配一个或多个词");
    }

5、调用/send/java接口,查看结果(路由key为JAVA.LOG,所以两个服务都能匹配的到)

(1)服务1:

(2)服务2:

6、调用/send/java/error接口,查看结果(路由key为JAVA.LOG.ERROR,所以只有JAVA.#能匹配的到)

(1)服务1:

(2)服务2:

通配模式路由key的这里要用*.*,注意中间是点,不能换别的!

延迟队列


什么是延迟队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列的使用场景

(1)订单在十分钟之内未支付则自动取消。
(2)新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
(3)账单在一周内未支付,则自动结算。
(4)用户注册成功后,如果三天内没有登陆则进行短信提醒。
(5)用户发起退款,如果三天内没有得到处理则通知相关运营人员。
(6)预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

实现

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。
死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
这个队列,就是死信队列
成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)

注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键

通过DLX和TTL模拟出延迟队列的功能,即:a交换器绑定a1队列,发送消息到a交换器,消息会被保存在a1队列内,而a1队列内的消息会设置过期时间,等到了过期时间还没有被消费,该消息就会发送给死信交换器b,而b和死信队列b1绑定,我们只需要消费b1里面的消息就行。

1、定义交换器及队列名称

/**
     * 接收延迟消息的队列
     */
    public static final String TRADE_ORDER_DELAY_QUEUE = "trade-order-delay-queue";
    /**
     * DLX,dead letter发送到的 exchange
     * 接收延迟消息的队列交换器
     */
    public static final String TRADE_ORDER_DELAY_EXCHANGE = "trade-order-delay-exchange";
    /**
     * routing key 名称
     * 具体消息发送在该 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order-delay";

    /**
     * 接收死信消息的queue - queue
     */
    public static final String DEAD_LETTER_QUEUE = "dead-letter-queue";

    /**
     * 接收死信消息的exchange - exchange
     */
    public static final String DEAD_LETTER_EXCHANGE = "dead-letter-exchange";

    /**
     * routing key 名称
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "dead-letter";

2、TradeOrderQueueConfig类初始化交换器和队列,并绑定

/**
     * 接收延迟信息的队列,并指定过期时间,以及过期之后要发送到哪个死信交换器,以及死信交换器的路由
     *
     * @return
     */
    @Bean(name = "delayOrderQueue")
    public Queue delayOrderQueue() {
        Map<String, Object> params = new HashMap<>(2);
        // x-dead-letter-exchange 声明了当前队列绑定的死信交换机
        params.put("x-dead-letter-exchange", RabbitMqKey.DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
        // x-message-ttl 队列过期时间
        params.put("x-message-ttl", 100000);
        return QueueBuilder.durable(RabbitMqKey.TRADE_ORDER_DELAY_QUEUE).withArguments(params).build();
    }

    /**
     * 接收延迟信息的交换器
     *
     * @return
     */
    @Bean(name = "orderDelayExchange")
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE);
    }

    @Bean
    Binding orderDelayBinding(@Qualifier("delayOrderQueue") Queue delayOrderQueue,
                         @Qualifier("orderDelayExchange") DirectExchange orderDelayExchange) {
        return BindingBuilder.bind(delayOrderQueue).to(orderDelayExchange).with(RabbitMqKey.ORDER_DELAY_ROUTING_KEY);
    }

    /**
     * 接收死信队列内的信息 - queue
     * @return
     */
    @Bean(name = "orderQueue")
    public Queue orderQueue() {
        return new Queue(RabbitMqKey.DEAD_LETTER_QUEUE, true);
    }

    /**
     * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
     * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
     **/
    @Bean(name = "orderTopicExchange")
    public TopicExchange orderTopicExchange() {
        return new TopicExchange(RabbitMqKey.DEAD_LETTER_EXCHANGE);
    }

    @Bean
    Binding orderTopicBinding(@Qualifier("orderQueue") Queue orderQueue,
                              @Qualifier("orderTopicExchange") TopicExchange orderTopicExchange) {
        return BindingBuilder.bind(orderQueue).to(orderTopicExchange).with(RabbitMqKey.DEAD_LETTER_ROUTING_KEY);
    }

3、查看绑定信息

4、增加DelayListener类接收延迟消息

@Component
public class DelayListener {

    private static final Logger logger = LoggerFactory.getLogger(DelayListener.class);

    /**
     * 接收延迟消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.DEAD_LETTER_QUEUE)
    public void process(Message message) {
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isEmpty(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            logger.info("接收到的延迟消息:{}", msg);
        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

5、增加发送延迟消息的方法和接口

Sender类增加发送方法:

/**
     * 发送延时队列信息
     *
     * @param payload
     * @return
     */
    public String delaySend(Object payload){
        return baseSend(RabbitMqKey.TRADE_ORDER_DELAY_EXCHANGE, RabbitMqKey.ORDER_DELAY_ROUTING_KEY, payload, null, null);
    }

ProducersController类增加接口

/**
     * 发送信息
     */
    @PostMapping("/send/delay/message")
    public void sendDelayMessage(){
        sender.delaySend("某某某订单已经失效,请归还库存");
    }

6、调用接口,查看结果

注意:这里的过期时间设定死了是100秒,就是说你发送到这个队列的信息,都是100秒过期的,如果你的业务有些是10秒过期的,有些是1分钟过期的,你就不能用这种方式,得用一个rabbitmq的插件,具体内容请点击这里

RabbitMq使用过程中遇到的问题及解决方案


点击查看Spring Boot + RabbitMQ 配置参数解释

消息重试机制

如果有某一条消息消费者业务逻辑执行失败,需要重新发送mq再次消费的话,可以利用rabbitmq的重试机制,而重试的话一般都需要指定重试次数以及重试间隔时间等,不然后面的消息会无法获取,设置重复次数的话一般有以下几种做法:

1、使用redis或者mongo等第三方存储当前重试次数。
2、在header中添加重试次数,并且使用channel.basicPublish() 方法重新将消息发送出去后将重试次数加1。
3、使用spring-rabbit中自带的retry功能。

如果你设置消息是手动确认的,那么第三种方式配置了是无效的,channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);这个方法是失败确认,第三个参数就是失败之后你是否要将消息重新放回队列,如果你配置了true,将消息放回队列中,那么消息会一直重试,而不会按照配置文件中重试次数走,这样会一直阻塞后面的消息,所以手动确认的就要用第一种或者第二种方式,自己设置重复次数,达到重复次数之后将信息转发到另一个队列中记录日志等等,然后手动确认成功。

第一种:

1、先测试一下上面说的内容:

配置的最大重试次数、最大重试时间、重试间隔等参数无用,消息会无限制重复。

2、如果一定要手动确认的话,可以看下下面图片,交换器绑定一个死信队列,在失败确认之后数据丢在死信队列,然后记录日志,人工干预:

3、如果要手动确认,并且要指定重复次数,重复次数放在缓存中(会阻塞后续消息,意思就是这5次重试执行完之后才会有下个消息):

(1)配置文件改为手动确认

(2)接收类里面的方法都改成手动确认,这里缓存的key:correlation就是我们在发送消息时添加进去的唯一ID

correlation:

(3)查看结果,超过5次就进入了死信队列(到时候可以拿死信队列中的数据来记录日志或提示人工干预等)

4、如果不想阻塞消息,想将失败的信息重新推入队列中,并且遵循先入先出原则的话请参照第二种方式

第二种:

请点击这里,大概思路是这样的(就是把重试的次数放在rabbitmq的头文件中,下次执行的时候你从头文件获取到信息,然后判断是否超过最大重复次数,然后进行相应处理),代码是否能运行我没试过。

第三种:

1、增加交换器及队列名称

    /**
     * 测试-队列
     */
    public static final String TEST_QUEUE = "test-queue";

    /**
     * 测试-交换器
     */
    public static final String TEST_EXCHANGE = "test-exchange";

    /**
     * 最大重试次数之后接收消息死信队列
     */
    public static final String DELAY_QUEUE = "delay-queue";
    /**
     * 最大重试次数之后接收消息的交换器
     */
    public static final String DELAY_EXCHANGE = "delay-exchange";
    /**
     * routing key 名称
     */
    public static final String DELAY_ROUTING_KEY = "delay-routing-key";

2、绑定关系

@Bean(name = "testExchange")
    public FanoutExchange testExchange() {
        logger.info("exchange : {}", RabbitMqKey.TEST_EXCHANGE);
        return new FanoutExchange(RabbitMqKey.TEST_EXCHANGE);
    }

    @Bean(name = "delayTestQueue")
    public Queue delayTestQueue() {
        logger.info("queue : {}", RabbitMqKey.TEST_QUEUE);
        // 队列持久化
        return new Queue(RabbitMqKey.TEST_QUEUE, true);
    }

    @Bean
    Binding delayTestBinding(@Qualifier("delayTestQueue") Queue delayTestQueue,
                          @Qualifier("testExchange") FanoutExchange testExchange) {
        return BindingBuilder.bind(delayTestQueue).to(testExchange);
    }

    @Bean(name = "delayQueue")
    public Queue delayQueue() {
        logger.info("queue : {}", RabbitMqKey.DELAY_QUEUE);
        // 队列持久化
        return new Queue(RabbitMqKey.DELAY_QUEUE, true);
    }

    @Bean(name = "delayExchange")
    public DirectExchange delayExchange() {
        logger.info("exchange : {}", RabbitMqKey.DELAY_EXCHANGE);
        return new DirectExchange(RabbitMqKey.DELAY_EXCHANGE);
    }

    @Bean
    Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,
                          @Qualifier("delayExchange") DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitMqKey.DELAY_ROUTING_KEY);
    }

3、修改配置文件

  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admin
    password:  admin
    # 是否启用【发布确认】
    publisher-confirms: true
    # 指定一个请求能处理多少个消息
    listener:
      simple:
        # 限流(海量数据,同时只能过来一条)
        prefetch: 1
        # 自动签收auto  手动 manual  默认自动签收
        acknowledge-mode: auto
        retry:
          # 开启重试机制
          enabled: true
          # 重试次数
          max-attempts: 5
          # 最大间隔时间
          max-interval: 20000
          # 重试间隔时间(单位毫秒)
          initial-interval: 3000
          #乘子  重试间隔*乘子得出下次重试间隔  3s  6s  12s  24s  此处24s>20s  走20s
          multiplier: 2
          # 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
        default-requeue-rejected: false

4、TestListener类增加处理队列信息的方法

@Component
public class TestListener {

    private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

    /**
     * 接收消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.TEST_QUEUE)
    public void process(Message message) throws UnsupportedEncodingException {
        String msg = new String(message.getBody());
        if (StringUtils.isBlank(msg)) {
            logger.warn("接收的数据为空");
            return;
        }
        System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
        //出现异常
        int a = 0;
        int b = 1 / a;
    }

    /**
     * 接收超过最大重试次数的消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitMqKey.DELAY_QUEUE)
    public void process1(Message message){
        try {
            String msg = new String(message.getBody());
            if (StringUtils.isBlank(msg)) {
                logger.warn("接收的数据为空");
                return;
            }
            System.out.println("接收到的死信消息:" + msg);
        } catch (Exception e) {
            logger.warn("处理接收到数据,发生异常:{}", e.getMessage());

        }
    }
}

5、Sender类增加发送消息方法

/**
     * 测试
     *
     * @param payload
     * @return
     */
    public String testSendExchange(Object payload){
        return baseSend(RabbitMqKey.TEST_EXCHANGE, "", payload, null, null);
    }

6、ProducersController增加测试接口

/**
     * 发送信息
     */
    @PostMapping("/send/test")
    public void sendTest(){
        sender.testSendExchange("测试消息重试机制");
    }

7、最重要的一点,RabbitAck类增加超过最大重试次数的信息转发的交换器

@Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, RabbitMqKey.DELAY_EXCHANGE, RabbitMqKey.DELAY_ROUTING_KEY);
    }

如果不手动配置MessageRecoverer,会默认使用RejectAndDontRequeueRecoverer,实现仅仅是将异常打印抛出,源码如下:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {

    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);

    @Override
    public void recover(Message message, Throwable cause) {
    	if (this.logger.isWarnEnabled()) {
            this.logger.warn("Retries exhausted for message " + message, cause);
    	}
    	throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
    }

}

8、测试,查看结果

(1)配置了MessageRecoverer:

(2)不配置了MessageRecoverer:

如果不是必须保证消息的投靠特别稳定、数据不能出现一点丢失。那么完全可以不用配自动重试的机制的,毕竟网络波动这种情况还是很少见的,业务逻辑执行有问题(比如空指针,数组越界等问题)你重试一百次也是报异常,这种就只能拉个程序员出来祭天就好了。

重复消费

在这里偷一张敖丙(一位非常优秀的作者)的图给大家解释一下

用户下了单,积分服务消费失败了,申请重试,而活动系统、优惠券等服务已经消费成功了,请求重试就是要重新发送一次这个消息,那已经消费成功的服务岂不是要多次消费?这样是不行的,轻则被老大骂,重则收拾铺盖回家,更重则被老板当场打死,那我们怎么办才能避免消息的重复消费呢?

一般这种问题的处理方式叫做接口幂等

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。  例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.

通俗意思:你在接收到MQ的时候,校验一下这个mq你是否消费过,如果消费过就不在消费了,如果没有消费过就进行消费,一般业务都是有订单号的,可以用订单号+业务场景来判断,如果存在此订单号和业务场景就不在消费了,不存在就进行消费(我现在的项目就是接收到mq,校验一下mq传过来的单号是否存在,存在则打印异常日志,不存在则消费)

消息丢失

解决消息丢失问题有四种方法:

1.消息持久化(看上面的代码你就知道队列和交换器怎么持久化了)

2.ACK确认机制(上面也讲了)

3.设置集群镜像模式

4.消息补偿机制

具体的内容需要点击这里查看

具体解决方法还是看具体公司情况的,我公司这边是这样的,比如下单之后要扣减手续费,我们是先把手续费等接收下单成功mq之后记下来,然后晚上统一划转的,不实时划转主要是考虑到金额小数点问题,因为要按一定比例分给好几个账户,一次划转总比多次好,所以我们在定时任务划转金额的时候会按订单表数据校验当天所有订单,看是否有订单成功但是mq信息丢失的情况,如果有就自动补录手续费进去,当然这只是我们公司的处理方法,可能不太成熟,具体处理信息丢失问题还是看你们公司具体的情况再定!

顺序消费

这里的顺序消费是指消费者的顺序消费,rabbitmq队列中只是存放了发布方的顺序消息,但是消费者是否是顺序消费又是一回事

如上图,两者的结果是不是完全不一样了

解决方案:

一个Queue对应一下Consumer(消费者),把需要保证顺序的message都发送到一个queue当中,开启手动确认,设置prefetchCount=1,每次只消费一条信息,处理过后进行手工ack,然后接收下一条message,只是由一个Consumer进行处理

这里说一下,如果还是多个Consumer,使用同步处理,手工ack是不行的,第一时间每个Consumer都会收到message(如果message数量>consumer数量),剩余的message才会等到ack之后发送过来,所以还是无法保证顺序消费,如下图

按照我的想法来说,队列本来就是先进先出的,A提供者推送新增、修改、删除三条消息入队列,那么B消费者接收到的也应该是新增、修改、删除,由于是手动确认机制,B在接收到新增之后,执行成功才会手动确认,才会接收到修改,然后再接收到删除消息,这样也是保证了顺序(我不知道我这个理论有没有什么问题,如果你们知道有啥问题请评论告诉我一下,我改下,我也百度了一些,但是基本上都是同一个思路的)

这里有个文章,关于顺序消费的,有需要的可以看下!


本文章源码(服务1的,可能会有点乱,如果大家有什么问题可以评论,我看到会及时回应的)在这里,这个项目也是前几天刚弄的,只需要本地安装一个rabbitmq即可运行,以后会持续增加springcloud相关组件以及一些中间件的使用方式的!

如果有需要的话可以关注一下我的公众号,会即时更新Java相关技术文章,公众号内还有一些实用资料,如Java秒杀系统视频教程、黑马2019的教学资料(IDEA版)、BAT面试题汇总(分类齐全)、MAC电脑常用安装包(有一些是淘宝买的,已PJ的)。

跑路跑路,有缘下篇文章再见,还没点赞的童鞋记得点个赞哈!