RabbitMQ
声明:本文的命令是在Win10系统,内容基本按照官网+谷歌浏览器进行学习RabbitMQ(兔子消息队列?).
官网地址:www.rabbitmq.com/
参考3y的什么是消息队列:github.com/ZhongFuChen…
注意:本文是学习教程,是对基础的使用和参数说明,真正线上使用不止这些配置,后续会进行线上使用更新。
安装
按照官网瞎摸索,瞎子过程就不说了。。。需要安装客户端软件(下载地址所示),以及开启服务软件在下载文档说明(如图哪个)
速度慢可以考虑使用迅雷下载,杠杠的。
下载文档说明:www.rabbitmq.com/install-win…
结果:(要下载两个安装)
介绍
MQ即是(Message Queue消息队列),对于队列不陌生吧,就是具有先进先出的特性容器,那消息呢?是指要发送的内容,要提醒其他人的一条信息。
官网说明:(图片翻译:来源谷歌浏览器,注意下面翻译斑点应为文件)
消息传递通常使用的一些术语:
生产者:就是发送消息的程序
消费者:就是接收消息的程序
队列:就是给要发送的消息进行缓存(给消息进行排排队,咱们消息一个一个来)
简单形容过程就是生产者把消息给到队列,消费者从队列取出消息。但是这个框架还提供了许多强大的功能,比如保证消息处理成功、通知特定的消费者进行接收等等。
Hello World
需要事先安装好客户端软件和服务软件。建立一个Maven项目,添加RabbitMQ依赖。
maven搜索网站,直接搜索就有依赖:mvnrepository.com/
启动安装好的RabbitMQ Service start, 提示》》》请求的服务已经启动。(闪退的话尝试管理员启动)
终于准备好了,可以上代码了!!!!!官网第一个简单的hello world
package top.codexu.rabbitmq.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 发送类
*
* @author codeXu
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 采用工厂模式
ConnectionFactory factory = new ConnectionFactory();
// 设置主机,因是本地,故不用配置其他属性。
factory.setHost("localhost");
// try-with-resources语句:括号里进行嵌套字连接,使用工厂得到连接类,再建立一个通道。
// try-with-resources特性是在括号里的资源不用手动finally进行关闭,前提是资源实现了java.lang.AutoCloseable或java.io.Closeable的类
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列:指定名字、是否持久、是否仅此连接、是否自动删除、队列的其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 进行发布消息,指定交易所、队列名、消息的其他属性、消息体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 打印看看是否发送了
System.out.println(" [x] 发送 '" + message + "'");
}
}
}
package top.codexu.rabbitmq.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 接收类
*
* @author codeXu
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] 等待消息.");
// 声明一个回调,进行处理得到的队列消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收 '" + message + "'");
};
// 队列名、 如果服务器应考虑消息传递后已确认,则为true、传递回调、取消回调
// 消费开启:一直保持着,得到了队列内容,就执行回调函数
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
先运行消费者或生产者都无所谓,因消费者无明确关闭资源,会一直保持运行,故没使用try-with-resources。
结果能够发送和接收说明运行成功。
注意
若要查看本机上的队列,可以移动到对应的工具路径下:安装rabbitmq-service的目录下的\rabbitmq_server-3.8.4\sbin。
在对应的路径下运行rabbitmqctl.bat list_queues
显示结果(按照表格形式)有个name(列头)说明是哪个队列,message(列头)说明这个队列目前存在多少个消息。
工作队列模式
通过上面的hello world已经大致知道了这个调用的过程,咱们要深入的学习,了解清楚它的特性,继续学习!!!
工作队列是啥?就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。
将要说到的特性:消息确认、消息持久、公平派遣
消息确认:即是消费者处理完了,发送个通知告诉MQ我处理完成了
消息持久:就是遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失,但不是百分百。
若要强保证使用发布者确认:www.rabbitmq.com/confirms.ht…
循环调度:若开着两个或三个消费者的时候,当多个消息要接收,MQ是会自动循环找下一个,避免一直重复同一个或几个。
公平派遣:当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。
咱们还是直接代码说话,注释写详细点。(java代码写的好,很多可以见名知意)
package top.codexu.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
/**
* @author codeXu
*/
public class NewTask {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个队列:指定名字、是否持久、是否仅此连接、是否自动删除、队列的其他属性
// 消息持久化前提:这个队列第一次声明的时候就是为持久的,并且在生产者端和消费者端都要声明为true。(第二个参数) 注意:这个持久不是百分百保证。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 循环5发送5次消息
for (int i = 0; i <= 5; i++) {
String message = "第" + i + "个消息.";
// 进行发布消息,指定交易所、队列名、消息的其他属性、消息体
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] 发送 '" + message + "'");
}
}
}
}
package top.codexu.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author codeXu
*/
public class Worker {
private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 这里也要为true
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] 等待消息.");
// 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人)
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收 '" + message + "'");
// 假设执行一个耗时的任务,使用等待举例。
try {
doWork(message);
} catch (InterruptedException e) {
e.getMessage();
} finally {
System.out.println(" [x] 结束");
// 执行完成,返回确认消息,若不返回确认消息(下面这句),MQ就会认为这个消息没执行。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 消息确认:设置false(默认状态)为需要消费者发送回确认。若没返回确认,这个消息就不算被处理过,就把这个消息从新在MQ上排队。
// 目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行)
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(10000);
}
}
}
}
若通过系统要查看当前队列有那些消息还未确定的,可以通过以下命令:
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged运行结果如下:
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
work 0 3
hello 0 0
name:队列名,messages_ready等待接收的消息, messages_unacknowledged未返回成功的消息(已被消费者处理过或正在处理)
发布/订阅
构造各种队列的参数说明:www.rabbitmq.com/queues.html
发布/订阅:即是一个生产者要发布消息,然后有订阅它的都能得到这个消息。实现和上面的差别不大,加了一个交换所再中间。咱们还是直接上代码。
package top.codexu.rabbitmq.publicsubscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发射日记
*
* @author codeXu
*/
public class EmitLog {
/** 交互所的名字 */
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 采用交易所的模式,无须声明队列,直接声明交互所。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "info: Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] 发送 '" + message + "'");
}
}
}
package top.codexu.rabbitmq.publicsubscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 接收日志类
*
* @author codeXu
*/
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 交换所声明,类型是fanout(扇出)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
// 队列绑定到对应的交换所
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] 等待消息.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 接收 '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
测试结果:启动多个接收日志,然后再启动发射类,即可看见结果是多个接收日志都有响应。
查看队列绑定的交换所:
rabbitmqctl list_bindings
存在一个logs交换所,类型exchange,目的名:XXX ,目的的类是的队列。
路由
对比发布订阅模式,路由是可以指定匹配的才接收。
举例(举例是很好的学习方式):现在情况如下,一个发布者对照多个接收者,作为发布者:我只想把内容发布出去,并给个等级它,交换所你帮我处理。作为接收者:我只能得到发布者的最高等级的消息,其他的我要来也没用。
上面的情况如果使用订阅发布模式是不能区分开等级的。故咱们使用接下要讲的路由模式。
等级是为了举例而使用的,现在咱们官方说法是路由键,就是一串字符,好比咱们下面要讲的例子,可以是"info" "warning" "error" "red" "blue"等等字符串,就是为了匹配不同的队列。路由键 == 事先商量好的匹配字符串
咱们用官网的图说话:发送者p把消息交给交易所X,然后发送的级别是info的话,就只有C2接收到了,发送error等级的话就两者可以收到。
上面已经讲清楚了路由的模式,现在咱们来看看实现,使用路由键匹配模式需要声明交换所的类型为直接(direct)类型。上代码,走起!!!
package top.codexu.rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 注意:声明类型为直接交互类型(direct)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 发送的路由键为warning,内容是Hello World! 路由键可以自己换
String severity = "warning";
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
package top.codexu.rabbitmq.routing;
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 咱们得到两个队列
String queueName1 = channel.queueDeclare().getQueue();
String queueName2 = channel.queueDeclare().getQueue();
//队列1绑定多个路由键
String[] severities1 = {"info", "warning", "error"};
for (String severity : severities1) {
channel.queueBind(queueName1, EXCHANGE_NAME, severity);
}
// 队列2只绑定一个info
String[] severities2 = {"info"};
for (String severity : severities2) {
channel.queueBind(queueName2, EXCHANGE_NAME, severity);
}
System.out.println(" [*] 等待消息.");
// 队列1进行接收响应
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
// 队列2进行接收响应
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
}
}
测试结果:发送info等级的话,两个消费者都可以得到,发送warning、error等级的只有消费者1可以得到
主题
主题类型就是在路由的升级一下,就是高级匹配。
现在咱们来仔细说明一下!!!
首先,咱们发送端的路由键改为:info.black.ok 为什么要改呢?是为了说明主题的作用!!!
要想匹配上面的路由键,接收端咱们可以这样写:info.*.* 或 .*.*ok 这样等等。是不是"*"就是全匹配字符!!!
接收端还能这样写:info.#
咱们说明一下"*"和"#"的作用:
("*") 可以代替一个单词
("#") 可以代替零个或多个单词
Q1和Q2队列的匹配路由键如上图所示
队列1表明对所有为橙色的感兴趣
队列2表明对兔子以及懒惰的感兴趣
咱们来举例发送端的发送情况,看看匹配那些队列。
发送端的路由键 | 匹配的队列 | 备注 |
---|---|---|
quick.orange.rabbit | Q1和Q2 | Q1:orange,Q2:lazy |
lazy.orange.elephant | Q1和Q2 | Q1:orange,Q2:lazy |
quick.orange.fox | Q1 | |
lazy.brown.fox | Q2 | |
lazy.pink.rabbit | Q2 | |
quick.brown.fox | 无 | |
quick.orange.male.rabbit | 无 | 队列1匹配的是3个. |
lazy.orange.male.rabbit | Q2 | #的作用是多个或零个 |
结尾
本文涉及到的交换所内置的交互类型:
DIRECT("direct") 扇出:特性是广播,在这个交换所的所有接收队列都会收到发送者的消息。
FANOUT("fanout") 直接:特性是 路由匹配。
TOPIC("topic") 主题:特性是路由匹配的升级,直接是一个字符串的,主题可以为指定前缀类型。
HEADERS("headers"); headers 匹配 AMQP 消息的 header 而不是路由键,听说几乎不用了 故博主未了解。
都看到这里了,还不点赞收藏一下吗?谢谢支持
唠叨唠叨,第一次正式的写博文,感觉掘金体验不是非常好。