阅读 201

SpringBoot系列(整合RabbitMQ)

问题:

    springboot整合RabbitMQ 简单使用。
    百度 安装和启动。
复制代码

原理:

    生产者发送消息给   rabbit(交换机,队列,key)
    消费者监听队列     处理结果
    
    
    延迟队列原理:
    "假延迟队列(先进先出策略,先放进去一个30秒,再放进去一个10秒,不会先处理10秒的)"
    1个交换机。绑定两个队列。
    A队列不设置消费者。
    B队列设置消费者。
    把消息发送到A队列(带上MessagePostProcessor,设置超时时间等)。
    因为A队列没有消费者,所以超过设置时间数据还没消费,数据就会变成死信(Dead-letter)。
    然后就根据A队列的配置。自动转发到另一个队列(B)中去了。
    B直接消费就可以了。

复制代码

文档:

    官方文档: https://www.rabbitmq.com/
复制代码

步骤:

    1.引入spring-boot-starter-amqp
    2.配置文件编写地址rabbit连接地址
    3.创建交换机和队列并绑定。
    4.创建生产者和消费者
    5.测试
复制代码

1.引入spring-boot-starter-amqp

        <!-- rabbit mq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
复制代码

2.配置文件编写地址rabbit连接地址

# TODO 改成自己的地址
spring.rabbitmq.host=
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
复制代码

3.创建交换机和队列并绑定。


/**
 * 相关常量
 * @author dripy
 * @date 2019/12/23 17:37
 */
public class Constant {


    /**
     * 交换机
     */
    public static final String MQ_EXCHANGE = "dripy-test-exchange";

    /**
     * 普通队列相关
     */
    public static final String MQ_QUEUE = "dripy-test-queue";

    /**
     * 延迟队列相关
     */
    public static final String MQ_DELAY_QUEUE = "dripy-test-queue-delay";

}

复制代码
1个交换机,一个队列,一个交换机与队列绑定。
后面死信队列 和 死信队列与交换机绑定是为了测试延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * rabbit MQ 配置
 * <p>
 * exChange 交换机 --->   key    --->  queue
 * 把key和queue的值设置成一样吧。
 * 交换机找key 。 根据key 找 队列
 *
 * @author dripy
 * @date 2019/12/23 17:31
 */
@Configuration
public class RabbitConfig {


    /**
     * 交换机
     *
     * @return
     */
    @Bean("sendCommodityExChange")
    public DirectExchange directExchange() {
        //交换器名称、是否持久化、是否自动删除
        return new DirectExchange(Constant.MQ_EXCHANGE, false, false);
    }

    /**
     * 正常队列
     *
     * @return
     */
    @Bean("sendCommodityQueue")
    public Queue directQueue() {
        //队列名字,是否持久化
        return new Queue(Constant.MQ_QUEUE, false);
    }

    /**
     * 交换机与正常队列绑定
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    Binding binding(@Qualifier("sendCommodityQueue") Queue queue, @Qualifier("sendCommodityExChange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Constant.MQ_QUEUE);
    }


    /**
     * 死信队列
     * <p>
     * 配置队列中数据超时未消费时, 直接转发到指定队列(正常队列)中去
     *
     * @return
     */
    @Bean("delayqueue")
    public Queue deadLetterQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 当队列超时后。转发到对应的交换机和队列中去
        arguments.put("x-dead-letter-exchange", Constant.MQ_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", Constant.MQ_QUEUE);
        return new Queue(Constant.MQ_DELAY_QUEUE, true, false, false, arguments);
    }

    /**
     * 交换机与死信队列绑定
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    Binding delayBinding(@Qualifier("delayqueue") Queue queue, @Qualifier("sendCommodityExChange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Constant.MQ_DELAY_QUEUE);
    }

}

复制代码

4.创建生产者和消费者

生产者
import com.diandi.rabbit.config.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @author dripy
 * @date 2019/12/23 17:34
 */
@Controller
@RequestMapping("/test")
public class TestController {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RabbitTemplate rabbitTemplate;

    @ResponseBody
    @PostMapping("/send")
    public String sendInfo() {
        logger.info("发送消息");

        // 把消息发送到指定的 交换机、key(key和队列名称设置成一样。方便理解)
        rabbitTemplate.convertAndSend(Constant.MQ_EXCHANGE, Constant.MQ_QUEUE, "直接消费");


        // 特别注意。 不要给死信队列 写消费者
        // 把消息发送到死信队列中。设置超时时间。
        // 当超过 超时时间后  会直接转发到 指定队列中
        MessagePostProcessor processor = message -> {
            // 暂定5秒
            message.getMessageProperties().setExpiration("5000");
            return message;
        };

        rabbitTemplate.convertAndSend(Constant.MQ_EXCHANGE, Constant.MQ_DELAY_QUEUE, "延迟消费", processor);


        return "ok";
    }
}


复制代码
消费者
package com.diandi.rabbit.listener;

import com.diandi.rabbit.config.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 *
 * @author dripy
 * @date 2019/12/23 17:32
 */
@Component
@RabbitListener(queues = Constant.MQ_QUEUE)
public class TestListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void handler(String message) {
        logger.info("发放实物队列收到的消息:{}", message);
    }
}

复制代码

5.测试

调用接口模拟发送

调用接口模拟发送

项目结构:

关注下面的标签,发现更多相似文章
评论