RabbitMQ入门教程(通过官网学习)

3,135 阅读11分钟

RabbitMQ

声明:本文的命令是在Win10系统,内容基本按照官网+谷歌浏览器进行学习RabbitMQ(兔子消息队列?).

官网地址:www.rabbitmq.com/

参考3y的什么是消息队列:github.com/ZhongFuChen…

注意:本文是学习教程,是对基础的使用和参数说明,真正线上使用不止这些配置,后续会进行线上使用更新。

安装

按照官网瞎摸索,瞎子过程就不说了。。。需要安装客户端软件(下载地址所示),以及开启服务软件在下载文档说明(如图哪个)

速度慢可以考虑使用迅雷下载,杠杠的。

下载地址:erlang.org/download/ot…

下载文档说明:www.rabbitmq.com/install-win…


结果:(要下载两个安装)


介绍

MQ即是(Message Queue消息队列),对于队列不陌生吧,就是具有先进先出的特性容器,那消息呢?是指要发送的内容,要提醒其他人的一条信息。

官网说明:(图片翻译:来源谷歌浏览器,注意下面翻译斑点应为文件)


消息传递通常使用的一些术语:

  • 生产者:就是发送消息的程序

  • 消费者:就是接收消息的程序

  • 队列:就是给要发送的消息进行缓存(给消息进行排排队,咱们消息一个一个来)

    简单形容过程就是生产者把消息给到队列,消费者从队列取出消息。但是这个框架还提供了许多强大的功能,比如保证消息处理成功、通知特定的消费者进行接收等等。

Hello World

需要事先安装好客户端软件和服务软件。建立一个Maven项目,添加RabbitMQ依赖。

maven搜索网站,直接搜索就有依赖:mvnrepository.com/


启动安装好的RabbitMQ Service start, 提示》》》请求的服务已经启动。(闪退的话尝试管理员启动)

终于准备好了,可以上代码了!!!!!官网第一个简单的hello world

package top.codexu.rabbitmq.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
 * 发送类
 *
 * @author codeXu
 */
public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 采用工厂模式
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机,因是本地,故不用配置其他属性。
        factory.setHost("localhost");
        // try-with-resources语句:括号里进行嵌套字连接,使用工厂得到连接类,再建立一个通道。
        // try-with-resources特性是在括号里的资源不用手动finally进行关闭,前提是资源实现了java.lang.AutoCloseable或java.io.Closeable的类
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明一个队列:指定名字、是否持久、是否仅此连接、是否自动删除、队列的其他属性
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 进行发布消息,指定交易所、队列名、消息的其他属性、消息体
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            // 打印看看是否发送了
            System.out.println(" [x] 发送 '" + message + "'");
        }
    }
}

package top.codexu.rabbitmq.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 接收类
 *
 * @author codeXu
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] 等待消息.");

        // 声明一个回调,进行处理得到的队列消息。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收 '" + message + "'");
        };

        // 队列名、 如果服务器应考虑消息传递后已确认,则为true、传递回调、取消回调
        // 消费开启:一直保持着,得到了队列内容,就执行回调函数
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }

}

先运行消费者或生产者都无所谓,因消费者无明确关闭资源,会一直保持运行,故没使用try-with-resources。

结果能够发送和接收说明运行成功。

注意

若要查看本机上的队列,可以移动到对应的工具路径下:安装rabbitmq-service的目录下的\rabbitmq_server-3.8.4\sbin。


在对应的路径下运行rabbitmqctl.bat list_queues

显示结果(按照表格形式)有个name(列头)说明是哪个队列,message(列头)说明这个队列目前存在多少个消息。

工作队列模式

通过上面的hello world已经大致知道了这个调用的过程,咱们要深入的学习,了解清楚它的特性,继续学习!!!

工作队列是啥?就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。


将要说到的特性:消息确认、消息持久、公平派遣

  • 消息确认:即是消费者处理完了,发送个通知告诉MQ我处理完成了

  • 消息持久:就是遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失,但不是百分百。

  • 循环调度:若开着两个或三个消费者的时候,当多个消息要接收,MQ是会自动循环找下一个,避免一直重复同一个或几个。

  • 公平派遣:当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。

咱们还是直接代码说话,注释写详细点。(java代码写的好,很多可以见名知意)

package top.codexu.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

/**
 * @author codeXu
 */
public class NewTask {
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明一个队列:指定名字、是否持久、是否仅此连接、是否自动删除、队列的其他属性
            // 消息持久化前提:这个队列第一次声明的时候就是为持久的,并且在生产者端和消费者端都要声明为true。(第二个参数) 注意:这个持久不是百分百保证。
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            // 循环5发送5次消息
            for (int i = 0; i <= 5; i++) {
                String message = "第" + i + "个消息.";
                // 进行发布消息,指定交易所、队列名、消息的其他属性、消息体
                channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] 发送 '" + message + "'");
            }
        }
    }
}

package top.codexu.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author codeXu
 */
public class Worker {

    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
		// 这里也要为true
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] 等待消息.");

        // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人)
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收 '" + message + "'");
            
            // 假设执行一个耗时的任务,使用等待举例。
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.getMessage();
            } finally {
                System.out.println(" [x] 结束");
                // 执行完成,返回确认消息,若不返回确认消息(下面这句),MQ就会认为这个消息没执行。
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 消息确认:设置false(默认状态)为需要消费者发送回确认。若没返回确认,这个消息就不算被处理过,就把这个消息从新在MQ上排队。
        // 目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行)
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(10000);
            }
        }
    }
}

若通过系统要查看当前队列有那些消息还未确定的,可以通过以下命令:

> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

运行结果如下:

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
work    0       3
hello   0       0

name:队列名,messages_ready等待接收的消息, messages_unacknowledged未返回成功的消息(已被消费者处理过或正在处理)

发布/订阅

构造各种队列的参数说明:www.rabbitmq.com/queues.html

发布/订阅:即是一个生产者要发布消息,然后有订阅它的都能得到这个消息。实现和上面的差别不大,加了一个交换所再中间。咱们还是直接上代码。

package top.codexu.rabbitmq.publicsubscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 发射日记
 *
 * @author codeXu
 */
public class EmitLog {

    /** 交互所的名字 */
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 采用交易所的模式,无须声明队列,直接声明交互所。
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = "info: Hello World!";

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] 发送 '" + message + "'");
        }
    }
}

package top.codexu.rabbitmq.publicsubscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * 接收日志类
 *
 * @author codeXu
 */
public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 交换所声明,类型是fanout(扇出)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        // 队列绑定到对应的交换所
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] 等待消息.");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收 '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

测试结果:启动多个接收日志,然后再启动发射类,即可看见结果是多个接收日志都有响应。

查看队列绑定的交换所:

rabbitmqctl list_bindings


存在一个logs交换所,类型exchange,目的名:XXX ,目的的类是的队列。

路由

对比发布订阅模式,路由是可以指定匹配的才接收。

举例(举例是很好的学习方式):现在情况如下,一个发布者对照多个接收者,作为发布者:我只想把内容发布出去,并给个等级它,交换所你帮我处理。作为接收者:我只能得到发布者的最高等级的消息,其他的我要来也没用。

上面的情况如果使用订阅发布模式是不能区分开等级的。故咱们使用接下要讲的路由模式。

等级是为了举例而使用的,现在咱们官方说法是路由键,就是一串字符,好比咱们下面要讲的例子,可以是"info" "warning" "error" "red" "blue"等等字符串,就是为了匹配不同的队列。路由键 == 事先商量好的匹配字符串


咱们用官网的图说话:发送者p把消息交给交易所X,然后发送的级别是info的话,就只有C2接收到了,发送error等级的话就两者可以收到。

上面已经讲清楚了路由的模式,现在咱们来看看实现,使用路由键匹配模式需要声明交换所的类型为直接(direct)类型。上代码,走起!!!

package top.codexu.rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 注意:声明类型为直接交互类型(direct)
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            // 发送的路由键为warning,内容是Hello World!  路由键可以自己换
            String severity = "warning";
            String message = "Hello World!";
            
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

}

package top.codexu.rabbitmq.routing;

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 咱们得到两个队列
        String queueName1 = channel.queueDeclare().getQueue();
        String queueName2 = channel.queueDeclare().getQueue();

        //队列1绑定多个路由键
        String[] severities1 = {"info", "warning", "error"};
        for (String severity : severities1) {
            channel.queueBind(queueName1, EXCHANGE_NAME, severity);
        }

        // 队列2只绑定一个info
        String[] severities2 = {"info"};
        for (String severity : severities2) {
            channel.queueBind(queueName2, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] 等待消息.");

        // 队列1进行接收响应
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });

        // 队列2进行接收响应
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
    }
}

测试结果:发送info等级的话,两个消费者都可以得到,发送warning、error等级的只有消费者1可以得到

主题

主题类型就是在路由的升级一下,就是高级匹配。

现在咱们来仔细说明一下!!!

首先,咱们发送端的路由键改为:info.black.ok 为什么要改呢?是为了说明主题的作用!!!

要想匹配上面的路由键,接收端咱们可以这样写:info.*.* 或 .*.*ok 这样等等。是不是"*"就是全匹配字符!!!

接收端还能这样写:info.#

咱们说明一下"*"和"#"的作用:

  • ("*") 可以代替一个单词

  • ("#") 可以代替零个或多个单词


Q1和Q2队列的匹配路由键如上图所示

  • 队列1表明对所有为橙色的感兴趣

  • 队列2表明对兔子以及懒惰的感兴趣

咱们来举例发送端的发送情况,看看匹配那些队列。

发送端的路由键匹配的队列备注
quick.orange.rabbitQ1和Q2Q1:orange,Q2:lazy
lazy.orange.elephantQ1和Q2Q1:orange,Q2:lazy
quick.orange.foxQ1
lazy.brown.foxQ2
lazy.pink.rabbitQ2
quick.brown.fox
quick.orange.male.rabbit队列1匹配的是3个.
lazy.orange.male.rabbitQ2#的作用是多个或零个

结尾

本文涉及到的交换所内置的交互类型:

  • DIRECT("direct") 扇出:特性是广播,在这个交换所的所有接收队列都会收到发送者的消息。

  • FANOUT("fanout") 直接:特性是 路由匹配。

  • TOPIC("topic") 主题:特性是路由匹配的升级,直接是一个字符串的,主题可以为指定前缀类型。

  • HEADERS("headers"); headers 匹配 AMQP 消息的 header 而不是路由键,听说几乎不用了 故博主未了解。

都看到这里了,还不点赞收藏一下吗?谢谢支持


唠叨唠叨,第一次正式的写博文,感觉掘金体验不是非常好。