中间件系列五 RabbitMQ之Direct exchange(直连交换机)和路由

281 阅读5分钟
原文链接: blog.csdn.net

1. 概述

上篇文章,我们通过Fanout exchange(扇型交换机)实现生产者发送一个消息,这个消息同时被传送给所有队列。但是有时我们不希望所有的消息都被所有队列接收,我们希望可以指定类型为a的消息只能被队列A接收,类型为b的消息只能被队列B,C接收。扇型交换机只能无脑地广播消息给所有的消费者,其实质是广播给所有关联的队列。 为了实现这个功能,一种是建立多个交换机,这种方式简单暴力但是不灵活。本节我们介绍使用单个直连交换机+路由实现以上功能。

本节主要内容如下:
- 使用单个直连交换机+路由对消息进行规则路由
- 路由绑定用法:单个绑定、多个绑定
- direct交换机的声明和用法
- 使用完整的代码演示以上的用法

2. 本文功能说明

本文通过如下两个例子说明绑定的用法和direct交换机的用法

2.1. 单个绑定

这里写图片描述

在上图中,有2个队列绑定到直连交换机上。队列Q1使用绑定值为orange,队列Q2绑定值为black,green。在这种情况下,如果生产者发送的消息的路由值为orange,则此消息会被路由到队列Q1。如果生产者发送的消息的路由值为blcak,green,则此消息会被路由到队列Q1。其它的消息会被丢弃

2.2. 多个绑定

这里写图片描述

我们也可以将相同的绑定值绑定到不同的队列中。如上图中,队列Q1和Q2使用的绑定值都black。如果生产者发送的消息的路由值为black,则此消息会被同时路由到队列Q1和队列Q2

3. 生产者的代码

主要业务逻辑如下:

1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个direct交换机 
5. 发送消息,并配置消息的路由键

第四五步的代码如下:

// 声明一个direct交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "RoutingSend-" + System.currentTimeMillis();
// 发送消息,并配置消息的路由键
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

完整代码
发送者代码: RoutingSend.java

4. 消费者的代码

主要业务逻辑如下:
1. 配置连接工厂
2. 建立TCP连接
3. 在TCP连接的基础上创建通道
4. 声明一个direct交换机
5. 声明一个临时队列
6. 将临时队列绑定到交换机上,并在队列上绑定多个绑定值
7. 接收消息并处理

第4,5,6步代码如下:

// 声明一个direct交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定路由,同一个队列可以绑定多个值
for (String colour : colours) {
    channel.queueBind(queueName, EXCHANGE_NAME, colour);
}

完整代码
消费者代码:RoutingRecv.java

5. 测试

5.1. 模拟上文中”单个绑定”的场景:

BasicTest.java

@Test
public void routing_1() throws InterruptedException {
    // 接收端 1:绑定 orange 值
    executorService.submit(() -> {
        String[] colours = {"orange"};
        RoutingRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, colours);
    });
    // 接收端 2:绑定 black、green 值
    executorService.submit(() -> {
        String[] colours = {"black","green"};
        RoutingRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, colours);
    });

    Thread.sleep(5* 100);
    // 发送端1 : 发送 black,只有接收端1收到
    executorService.submit(() -> {
        String routing = "orange";
        RoutingSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, routing);
    });

    // 发送端2 : 发送 green、black,只有接收端2收到
    executorService.submit(() -> {
        String routing = "green";
        RoutingSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, routing);
    });

    // sleep 10s
    Thread.sleep(10 * 1000);
}

以上代码启动2个消费者,消费者1绑定orange,消费者2绑定black、green;
启动2个生产者,生产者1发送消息的路由键为orange,此消息只被消费者1接收
生产者2发送消息的路由键为green,此消息只被消费者2接收,符合之前的分析

 [RoutingRecv-[orange]] Waiting for messages.
 [RoutingSend] Sent 'orange':'RoutingSend-1516003763435'
 [RoutingSend] Sent 'green':'RoutingSend-1516003763444'
 // 生产者1发送消息的路由键为orange,此消息只被消费者1接收
 [RoutingRecv-[orange]] Received 'orange':'RoutingSend-1516003763435'
 // 生产者2发送消息的路由键为green,此消息只被消费者2接收
 [RoutingRecv-[black, green]] Waiting for messages.

5.2. 模拟上文中”多个绑定”的场景

BasicTest.java

@Test
public void routing_2() throws InterruptedException {

    // 接收端:同时创建两个接收端,同时绑定black
    int recNum = 2;
    while(recNum-- > 0) {
         // 接收端:绑定 black 值
        executorService.submit(() -> {
            String[] colours = {"black"};
            RoutingRecv.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, colours);
        });
    }

    Thread.sleep(5* 100);
    // 发送端1 : 发送 black,所有的接收端都会收到
    executorService.submit(() -> {
        String routing = "black";
        RoutingSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, routing);
    });

    // 发送端2 : 发送 green,所有的接收端都不会收到
    executorService.submit(() -> {
        String routing = "green";
        RoutingSend.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, routing);
    });

    // sleep 10s
    Thread.sleep(10 * 1000);
}

以上代码启动2个消费者,都绑定black
启动2个生产者,生产者1发送消息的路由键为black,此消息被2个消费者都接收
生产者2发送消息的路由键为green,此消息被丢失,符合之前的分析

测试输出如下:

 [RoutingRecv-[black]] Waiting for messages.
 [RoutingRecv-[black]] Waiting for messages.
 // 生产者1发送消息的路由键为black,此消息被2个消费者都接收
 [RoutingSend] Sent 'black':'RoutingSend-1516003663034'
 [RoutingRecv-[black]] Received 'black':'RoutingSend-1516003663034'
 [RoutingRecv-[black]] Received 'black':'RoutingSend-1516003663034'
 // 生产者2发送消息的路由键为green,此消息被丢失
 [RoutingSend] Sent 'green':'RoutingSend-1516003663045'

6. 代码

上文的详细代码主要如下:
发送者代码: RoutingSend.java
消费者代码: RoutingRecv.java
测试代码:BasicTest.java的方法 routing_1() 和routing_2
所有的详细代码见github代码,请尽量使用tag v0.9,不要使用master,因为master一直在变,不能保证文章中代码和github上的代码一直相同