SpringCloud前置知识+RabbitMQ

3,865 阅读23分钟

一、微服务架构介绍

1.单体架构

​ 单体架构也被称为单体应用,它是将所有的功能模块全部耦合在一个项目中

1.1 单体架构特点

​ 1.最终会被打包成一个独立的单元(一个唯一 的jar包或war包)

​ 2.会以一个进程的方式来运行

1.2 单体架构的优点与缺点

优点

  • 项目易于管理
  • 部署简单

缺点

  • 测试成本高
  • 可伸缩性差
  • 可靠性差
  • 迭代困难
  • 跨语言程度差
  • 团队协作难

2.微服务架构

2.1 什么是微服务

微服务是一种架构风格。一个大型的复杂软件应用,由一个或多个微服务组成。系统中 的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任 务并很好的完成该任务。

2.2 常见的架构风格
  • 客户端与服务端的
  • 基于组件模型的架构(EJB)
  • 分层架构(MVC)
  • 面向服务架构(SOA)
2.3 微服务的特点
  • 系统是由多个服务构成
  • 每个服务可以单独独立部署
  • 每个服务之间是松耦合的,服务内部高内聚,服务外部低耦合,高内聚就是一个项目专注的完成一个功能
2.4 微服务的优点与缺点

优点:

  • 容易测试
  • 可伸缩性强
  • 可靠性强
  • 跨语言程度更加灵活
  • 团队协作容易
  • 系统迭代容易

缺点:

  • 运维成本过高,部署的数量较多
  • 接口兼容多版本
  • 分布式系统的复杂性
  • 分布式事务

二、MVC、RPC、SOA、微服务架构之间的区别

1.MVC架构

其实MVC就是传统的单体架构

代表技术:SpringMVC、Spring、MyBatis等等。

2.RPC架构

RPC(Remote Procedure Call):远程过程调用。他一种通过网络从远程计算机程序上请求 服务,而不需要了解底层网络技术的协议。

代表技术:Thrift、Hessian 等等

3. SOA架构

SOA(Service oriented Architecture):面向服务架构

ESB(Enterparise Servce Bus):企业服务总线,服务中介。主要是提供了一个服务于服务之间的交互。 ESB 包含的功能如:负载均衡,流量控制,加密处理,服务的监控,异常处理,监控 告急等等。

代表技术:Mule、WSO2

4.微服务架构

微服务就是一个轻量级的服务治理方案。

代表技术:SpringCloud、dubbo 等等

三、微服务的设计原则

  • AKF拆分原则
  • 前后端分离原则
  • 无状态服务
  • RestFul通信风格

1. AKF拆分原则

业界对于可扩展的系统架构设计有一个朴素的理念,就是: **通过加机器就可以解决容量和可用性问题。(如果一台不行那就两台) **

这一理念在“云计算”概念疯狂流行的今天,得到了广泛的认可!对于一个规模 迅速增长的系统而言,容量和性能问题当然是首当其冲的。但是随着时间的向前, 系统规模的增长,除了面对性能与容量的问题外,还需要面对功能与模块数量上 的增长带来的系统复杂性问题以及业务的变化带来的提供差异化服务问题。而许多系统,在架构设计时并未充分考虑到这些问题,导致系统的重构成为常态,从 而影响业务交付能力,还浪费人力财力!对此,《可扩展的艺术》一书提出了一 个更加系统的可扩展模型—— AKF 可扩展立方 (Scalability Cube)。这个立方 体中沿着三个坐标轴设置分别为:X、Y、Z。

  • Y 轴(功能) —— 关注应用中功能划分,基于不同的业务拆分
  • X 轴(水平扩展) —— 关注水平扩展,也就是”加机器解决问题”
  • Z 轴(数据分区) —— 关注服务和数据的优先级划分,如按地域划分
1.1 Y轴(功能 )

Y 轴扩展会将庞大的整体应用拆分为多个服务。每个服务实现一组相关的功 能,如订单管理、客户管理等。在工程上常见的方案是服务化架构(SOA) 。比 如对于一个电子商务平台,我们可以拆分成不同的服务,组成下面这样的架构:

但通过观察上图容易发现,当服务数量增多时,服务调用关系变得复杂。为系统添加一个新功能,要调用的服务数也变得不可控,由此引发了服务管理上的混乱。所以,一般情况下,需要采用服务注册的机制形成服务网关来进行服务治理。系统的架构将变成下图所示:

1.2 X轴(水平扩展)

X 轴扩展与我们前面朴素理念是一致的,通过绝对平等地复制服务与数据, 以解决容量和可用性的问题。其实就是将微服务运行多个实例,做集群加负载均衡的模式。

为了提升单个服务的可用性和容量, 对每一个服务进行 X 轴扩展划分。

1.3 Z轴(数据分区)

Z 轴扩展通常是指基于请求者或用户独特的需求,进行系统划分,并使得划分出来的子系统是相互隔离但又是完整的。以生产汽车的工厂来举例:福特公司为了发展在中国的业务,或者利用中国的廉价劳动力,在中国建立一个完整的子 工厂,与美国工厂一样,负责完整的汽车生产。这就是一种Z轴扩展。

工程领域常见的 Z 轴扩展有以下两种方案:

  • 单元化架构

    在分布式服务设计领域,一个单元(Cell)就是满足某个分区所有业务操作 的自包含闭环。如上面我们说到的Y轴扩展的 SOA 架构,客户端对服务端节点 的选择一般是随机的,但是,如果在此加上Z轴扩展,那服务节点的选择将不再是随机的了,而是每个单元自成一体。如下图:

  • 数据分区

    为了性能数据安全上的考虑,我们将一个完整的数据集按一定的维度划分出 不同的子集。 一个分区(Shard),就是是整体数据集的一个子集。比如用尾号 来划分用户,那同样尾号的那部分用户就可以认为是一个分区。数据分区为一般 包括以下几种数据划分的方式:

    1. 数据类型(如:业务类型)
    2. 数据范围(如:时间段,用户 ID)
    3. 数据热度(如:用户活跃度,商品热度)
    4. 按读写分(如:商品描述,商品库存)

2. 前后端分离原则

何为前后端分离?前后端本来不就分离么?这要从尴尬的 jsp 讲起。分工精细化从来都 是蛋糕做大的原则,多个领域工程师最好在不需要接触其他领域知识的情况下合作,才可能 使效率越来越高,维护也会变得简单。jsp 的模板技术融合了 html 和 java 代码,使得传统 MVC 开发中的前后端在这里如胶似漆,前端做好页面,后端转成模板,发现问题再找前端, 前端又看不懂 java 代码......前后端分离的目的就是将这尴尬局面打破。 前后端分离原则,简单来讲就是前端和后端的代码分离,我们推荐的模式是最好采用物 理分离的方式部署,进一步促使更彻底的分离。如果继续直接使用服务端模板技术,如:jsp, 把 java、js、html、css 都堆到一个页面里,稍微复杂一点的页面就无法维护了。

这种分离方式有几个好处:

  • 前后端技术分离,可以由各自的专家来对各自的领域进行优化,这样前段的用户体 验优化效果更好。
  • 分离模式下,前后端交互界面更清晰,就剩下了接口模型,后端的接口简洁明了, 更容易维护。
  • 前端多渠道集成场景更容易实现,后端服务无需变更,采用统一的数据和模型,可 以支持多个前端:例如:微信 h5 前端、PC 前端、安卓前端、IOS 前端。

3. 无状态服务

对于无状态服务,首先说一下什么是状态:如果一个数据需要被多个服务共 享,才能完成一笔交易,那么这个数据被称为状态。进而依赖这个“状态”数据的 服务被称为有状态服务,反之称为无状态服务。

那么这个无状态服务原则并不是说在微服务架构里就不允许存在状态,表达 的真实意思是要把有状态的业务服务改变为无状态的计算类服务,那么状态数据 也就相应的迁移到对应的“有状态数据服务”中。

场景说明:例如我们以前在本地内存中建立的数据缓存、Session 缓存,到 现在的微服务架构中就应该把这些数据迁移到分布式缓存中存储,让业务服务变 成一个无状态的计算节点。迁移后,就可以做到按需动态伸缩,微服务应用在运 行时动态增删节点,就不再需要考虑缓存数据如何同步的问题。

4. RestFul的通讯风格

作为一个原则来讲本来应该是个“无状态通信原则”,在这里我们直接推荐一 个实践优选的 Restful 通信风格 ,因为他有很多好处:

  • 无状态协议 HTTP,具备先天优势,扩展能力很强。例如需要安全加密,有 现成的成熟方案 HTTPS 即可。
  • JSON 报文序列化,轻量简单,人与机器均可读,学习成本低,搜索引擎友好。
  • 语言无关,各大热门语言都提供成熟的 Restful API 框架,相对其他的一些 RPC 框架生态更完善。

四、SpringCloud简介

1.什么是SpringCloud

SpringCloud是一个服务治理平台,提供了一些服务框架。包含了:服务注册 与发现、配置中心、消息中心 、负载均衡、数据监控等等。

Spring Cloud 是一个微服务框架,**相比 Dubbo 等 RPC 框架, Spring Cloud 提 供的全套的分布式系统解决方案。 **

Spring Cloud 对微服务基础框架 Netflix 的多个开源组件进行了封装,同时又实现 了和云端平台以及和 Spring Boot 开发框架的集成。

Spring Cloud 为微服务架构开发涉及的配置管理,服务治理,熔断机制,智能路由, 微代理,控制总线,一次性 token,全局一致性锁,leader 选举,分布式 session,集 群状态管理等操作提供了一种简单的开发方式。

Spring Cloud 为开发者提供了快速构建分布式系统的工具,开发者可以快速的启动 服务或构建应用、同时能够快速和云平台资源进行对接。

2. SpringCloud的项目的位置

SpingCloud是Spring 的一个顶级项目与 SpringBoot、SpringData 位于同一位置。

3.SpringCloud的子项目

3.1 SpringCloud Config

​ 配置管理工具,支持使用 Git 存储配置内容,支持应 用配置的外部化存储,支持客户端配置信息刷新、加解密配置内容等

3.2 SpringCloud Bus

​ 事件、消息总线,用于在集群(例如,配置变化事件)中 传播状态变化,可与 Spring Cloud Config 联合实现热部署。

3.3 Spring Cloud Netflix>

​ **针对多种 Netflix 组件提供的开发工具包,其中包括 Eureka、Hystrix、Zuul、Archaius 等。 **

  • Netflix Eureka

    一个基于 rest 服务的服务治理组件,包括服务注册中心、服务注册与服务发现机制的实现,实现了云端负载均衡和中间层服务器的故障转移。

  • Netflix Hystrix

    容错管理工具,实现断路器模式,通过控制服务的节点, 从而对延迟和故障提供更强大的容错能力。

  • Netflix Ribbon

    客户端负载均衡的服务调用组件。

  • Netflix Feign

    基于Ribbon和Hystrix的声明式服务调用组件。

  • Netflix Zuul

    微服务网关,提供动态路由,访问过滤等服务。

  • Netflix Archaius:

    配置管理 API,包含一系列配置管理 API,提供动 态类型化属性、线程安全配置操作、轮询框架、回调机制等功能。

3.4 Spring Cloud for Cloud Foundry

​ 通过 Oauth2 协议绑定服务到 CloudFoundry,CloudFoundry 是 VMware 推出的开源 PaaS 云平台。

3.5 Spring Cloud Sleuth

​ 日志收集工具包,封装了 Dapper,Zipkin 和 HTrace 操作。

3.6 Spring Cloud Data Flow

​ 大数据操作工具,通过命令行方式操作数据流。

3.7 Spring Cloud Security

​ 安全工具包,为你的应用程序添加安全控制,主要是指OAuth2。

3.8 Spring Cloud Consul

​ 封装了Consul操作,consul是一个服务发现与配置工具,与Docker容器可以无缝集成。

3.9 Spring Cloud Zookeeper

​ 操作Zookeeper的工具包 , 用使用zookeeper方式的服务注册和发现。

3.10 Spring Cloud Stream:

​ 数据流操作开发包,封装了Redis、Rabbit、Kafka 等发送接收消息。

3.11 Spring Cloud CLI

​ 基于 Spring Boot CLI,可以让你以命令行方式快速建立云组件。

五、SpringCloud与Dubbo的区别

六、 Spring Cloud版本说明

1.常见版本号说明

​ 软件版本号:2.0.2.RELEASE

  • 2:主版本号。当功能模块有较大更新或者整体架构发生变化时,主版本号会更新

  • 0:次版本号。次版本表示只是局部的一些变动。

  • 2:修改版本号。一般是 bug 的修复或者是小的变动

  • RELEASE:希腊字母版本号。次版本号用户标注当前版本的软件处于哪个开发阶段

1.1希腊字母版本号
  • Base:设计阶段。只有相应的设计没有具体的功能实现
  • Alpha:软件的初级版本。存在较多的 bug
  • Bate:表示相对 alpha 有了很大的进步,消除了严重的 bug,还存在一些潜在的 bug。
  • Release:该版本表示最终版。

2. Spring Cloud 版本号说明

2.1为什么 Spring Cloud 版本用的是单词而不是数字?

设计的目的是为了更好的管理每个 Spring Cloud 的子项目的清单。避免子的版本号与子 项目的版本号混淆。

2.2 版本号单词的定义规则

采用伦敦的地铁站名称来作为版本号的命名,根据首字母排序,字母顺序靠后的版本号越大。

2.3 版本发布计划说明

七、RabbitMQ

1.什么是RabbitMQ

MQ全称Message Queue(消息队列),是一种应用程序与应用程序之间通信的一种方式,应用程序可以通过向消息队列中读写消息进行信息的交互,而不是以应用相互调用的方式进行通信。

2.安装RabbitMQ

2.1 安装rabbitmq所需要的依赖包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2.2 下载安装包(cd /usr/local/software)
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

2.3 安装服务命令
#第一步:安装erlang语言环境
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
#第二步:安装socat加解密软件
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
#第三步:最后安装rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
2.4 修改集群用户与连接心跳检测

**注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件 **

修改:loopback_users 中的 <<"guest">>,只保留guest(不修改只能通过localhost访问)

2.5 修改本机系统文件
#修改 
vim /etc/rabbitmq/rabbitmq-env.conf

添加:NODENAME=rabbit

#修改
vim /etc/hostname

#修改本地文件 
vim /etc/hosts

#验证服务器是可用的
rabbitmq-server start &

**执行管控台插件 **

rabbitmq-plugins enable rabbitmq_management

#检查端口
lsof -i:5672

#通过
ps -ef|grep rabbitmq

访问地址:http://192.168.159.8:15672

#下载延时插件:
wget https://dl.bintray.com/rabbitmq/communityplugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215- 3.6.x.zip  
#解压延时插件
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
#把延时插件拷贝到指定目录下
cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins
#启动延时插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.命令行和管控台

**开启管控台插件 rabbitmq-plugus rabbitmq_management 来开启管控台 **

测试连接: http://ip:15672(来访问) 用户名密码 guest/guest

3.1 管理控制台命令

a.起停服务命令

#启动服务
rabbitmqctl start_app

启动rabbitmq节点保证需要erlang虚拟机节点起来才能执行)

#停止服务
rabbitmqctl stop_app

停止rabbtimq节点,但是不会停止erlang节点rabbitmqctl stop都会停止

#查看服务状态 
rabbtimqctl status

b. 用户操作命令

#查看所有用户列表
rabbitmqctl list_users
#添加用户
rabbitmqctl add_user luyi luyi
#设置rabbitmq用户的角色
rabbitmqctl set_user_tags luyi administrator
#为用户设置权限
rabbitmqctl set_permissions -p / luyi ".*" ".*" ".*"

rabbitmqctl set_permissions -p <虚拟机> <用户名> ".*" ".*" ".*"
#列出用户权限
rabbitmqctl list_user_permissions luyi
#清除用户权限
rabbitmqctl clear_permissions -p <虚拟机> <用户名>
rabbitmqctl clear_permissions -p / root
#删除用户
rabbitmqctl delete_user root #root是用户名
#修改密码
rabbitmqctl change_password 用户名 新密码

c.虚拟主机操作

rabbitmqctl add_vhost /cloudmall 增加一个虚拟主机

rabbitmqctl list_vhosts; 查看所有的虚拟主机

rabbitmqctl list_permissions -p /cloudmall 查看虚拟主机的权限

rabbitmqctl delete_vhost /cloudmall 删除虚拟主机

d. 操作队列命令

rabbitmqctl list_queues 查询所有队列

rabbitmqctl -p vhostpath purge_queue blue 清除队列消息

e. 高级命令

rabbitmqctl reset 移除所有数据 该命令需要在 rabbitmqctl stop_app命令之后才执行(也就是说 在服 务停止后) r

abbitmqctl join_cluster [--ram] 组成集群命令

rabbitmqctl cluster_status 查看集群状态

rabbitmqctl change_cluster_node_type dist|ram 修改集群节点存储数据模式

rabbitmqctl forget_cluster_node [--offline]忘记节点 (摘除节点)

rabbitmqctc rename_cluster_node oldnode1 newnode1 oldnode2 newnode2 修改节点名称

4.为什么使用RabbitMQ

4.1 实现异步

4.2 解耦

4.3 流量削峰

5.消息队列基础知识

5.1 Provider

消息生产者,也就是发送消息的程序

5.2 Consumer

消息消费者,也就是接收消息的程序

5.3 没有使用消息队列的通信方式

5.4 使用消息队列后的通信方式

5.5 什么是队列

队列就像生活中的商店,商品制造商是消息生产者,顾客是消息消费者,他们之间通过商店实现交互

5.6 队列和应用程序的关系

多个消息生产者可以将消息发送到同一个消息队列中,多个消息消费者可以从同一个消息队列中取出消息。

6.RabbitMQ入门

6.1 创建项目
6.2 添加依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
6.3 配置全局配置文件
#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
6.4 创建配置类
/**
 * Author: LuYi
 * Date: 2019/11/3 14:03
 * Description: 队列配置类
 */
@Configuration
public class QueueConfig {

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue createQueue(){
        return new Queue("hello-queue");
    }
}
6.5 编写消息发送者
/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 队列名称
        //参数二: 消息
        rabbitTemplate.convertAndSend("hello-queue", msg);
    }
}
6.6 编写消息接收者
/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 */
@Component
public class Receiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitListener(queues = "hello-queue")
    public void process(String msg){

        System.out.println("Receiver : " + msg);
    }
}
6.7 测试
/**
 * 消息队列测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {

	@Autowired
	private Sender sender;

	@Test
	void contextLoads() {
	}

	/**
	 * 测试消息队列
	 */
	@Test
	public void test1(){
		sender.send("Hello RabbitMQ");
	}
}

7.RabbitMQ原理图

1.Message(消息):消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。

2.Publisher(生产者):向交换机发送消息的客户端程序

3.Consumer(消费者):从消息队列中获取消息的客户端程序

4.Exchange(交换机):用来接收客户端发过来的消息,并将这些消息路由给服务器中的消息队列

​ 三种常用的交换机:

​ a.direct(发布与订阅,完全匹配)

​ b.fanout(广播)

​ c.topic(主题,规则匹配)

5.Binding(绑定):用于交换机与消息队列之间的关联,一个绑定就是根据路由键将交换机与消息队列连接起来的规则。

6.Queue(消息队列):用于保存消息,直到将消息发送给消费者

7.Routing-key(路由键):RabbitMQ决定将消息投递到那个消息队列的规则。队列通过路由键绑定交换机,消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。

​ 如果匹配,消息会被发送到消息队列,如果不匹配,消息将进入黑洞。

8.Connection(链接):Rabbit服务器和服务之间建立的TCP连接

9.Channel(信道):信道是TCP中的虚拟链接,比如电缆就是TCP,信道则是一个独立光纤束。一条TCP连接建立多条信道是没有问题的。

​ TCP一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的

10.Virtual Host(虚拟主机):表示一台交换机和消息队列。虚拟主机是共享相同的身份认证和加密环境的独立服务器域 每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在链接时指定, RabbitMQ 默认的 vhost 是/  

11.Borker:表示消息队列的服务器实体

交换机和路由器的关系是什么?

交换机需要通过路由键和消息队列进行绑定,如果路由键相同,那么消息会被路由到对应的消息队列中,可以理解为路由键是决定消息发送到那个消息队列的规则

八、RabbitMQ交换器

1.Direct(发布与订阅 完全匹配)

1.1 需求

1.2 创建项目

rabbitmq-direct-consumer

rabbitmq-direct-provider

1.3 修改全局配置文件

consumer的配置文件

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称(处理日志的交换器)
mq.config.exchange=log.direct

#info级别的队列名称
mq.config.queue.info=log.info

#info的路由键
mq.config.queue.info.routing.key=log.info.routing.key

#error级别的队列名称
mq.config.queue.error=log.error

#error的路由键
mq.config.queue.error.routing.key=log.error.routing.key

provider的配置文件

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称(处理日志的交换器)
mq.config.exchange=log.direct

#info的路由键
mq.config.queue.info.routing.key=log.info.routing.key

#error的路由键
mq.config.queue.error.routing.key=log.error.routing.key
1.4 编写consumer

InfoReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("InfoReceiver : " + msg);
    }
}

ErrorReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.error.routing.key}"
        )
)
public class ErrorReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("ErrorReceiver : " + msg);
    }
}
1.5 编写provider
/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    //路由键名称
    @Value("${mq.config.queue.error.routing.key}")
    private String routingKey;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 交换器名称
        //参数二: 路由键
        //参数三: 消息
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
}
1.6 测试
/**
 * 消息队列测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {

	@Autowired
	private Sender sender;

	@Test
	void contextLoads() {
	}

	@Test
	public void test1() throws InterruptedException {
		while (true){
			Thread.sleep(1000);
			sender.send("Hello RabbitMQ");
		}
	}

}

2.Topic交换器(主题,规则匹配)

2.1 需求

2.2 创建项目

rabbitmq-topic-consumer

rabbitmq-topic-provider

2.3 修改配置文件

provider

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称(处理日志的交换器)
mq.config.exchange=log.topic

consumer

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称(处理日志的交换器)
mq.config.exchange=log.topic

#info级别的队列名称
mq.config.queue.info=log.info

#error级别的队列名称
mq.config.queue.error=log.error

#log级别的队列名称
mq.config.queue.all=log.all
2.4 编写provider

UserSender

/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class UserSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 交换器名称
        //参数二: 路由键
        //参数三: 消息
        rabbitTemplate.convertAndSend(exchange, "user.log.debug", "user.log.debug  "+msg);
        rabbitTemplate.convertAndSend(exchange, "user.log.info", "user.log.info  "+msg);
        rabbitTemplate.convertAndSend(exchange, "user.log.warn", "user.log.warn  "+msg);
        rabbitTemplate.convertAndSend(exchange, "user.log.error", "user.log.error  "+msg);
    }
}

ProductSender

/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class ProductSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 交换器名称
        //参数二: 路由键
        //参数三: 消息
        rabbitTemplate.convertAndSend(exchange, "product.log.debug", "product.log.debug  "+msg);
        rabbitTemplate.convertAndSend(exchange, "product.log.info", "product.log.info  "+msg);
        rabbitTemplate.convertAndSend(exchange, "product.log.warn", "product.log.warn  "+msg);
        rabbitTemplate.convertAndSend(exchange, "product.log.error", "product.log.error  "+msg);
    }
}

OrderSender

/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 交换器名称
        //参数二: 路由键
        //参数三: 消息
        rabbitTemplate.convertAndSend(exchange, "order.log.debug", "order.log.debug  "+msg);
        rabbitTemplate.convertAndSend(exchange, "order.log.info", "order.log.info  "+msg);
        rabbitTemplate.convertAndSend(exchange, "order.log.warn", "order.log.warn  "+msg);
        rabbitTemplate.convertAndSend(exchange, "order.log.error", "order.log.error  "+msg);
    }
}
2.5 编写consumer

InfoReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.info"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("InfoReceiver : " + msg);
    }
}

ErrorReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.error"
        )
)
public class ErrorReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("ErrorReceiver : " + msg);
    }
}

AllReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.all}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.*"
        )
)
public class AllReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("AllReceiver : " + msg);
    }
}
2.6 测试
/**
 * 消息队列测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {

	@Autowired
	private UserSender userSender;
	@Autowired
	private ProductSender productSender;
	@Autowired
	private OrderSender orderSender;

	@Test
	void contextLoads() {
	}

	@Test
	public void test1() {
		userSender.send("UserSender...");
		productSender.send("ProductSender...");
		orderSender.send("OrderSender...");
	}

}

3. Fanout交换器(广播)

3.1 需求

3.2 创建项目

rabbitmq-fanout-consumer

rabbitmq-fanout-provider

3.3 修改配置文件

consumer

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称
mq.config.exchange=order.fanout

#短信队列名称
mq.config.queue.sms=order.sms

#push队列名称
mq.config.queue.push=order.push

provider

#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称(处理日志的交换器)
mq.config.exchange=order.fanout
3.4 编写consumer

SmsReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.sms}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class SmsReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("SmsReceiver : " + msg);
    }
}

PushReceiver

/**
 * Author: LuYi
 * Date: 2019/11/3 14:12
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.push}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class PushReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){

        System.out.println("PushReceiver : " + msg);
    }
}
3.5 编写provider
/**
 * Author: LuYi
 * Date: 2019/11/3 14:06
 * Description: 消息发送者
 */
@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //交换器名称
    @Value("${mq.config.exchange}")
    private String exchange;

    /**
     * 发送消息的方法
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数一: 交换器名称
        //参数二: 路由键
        //参数三: 消息
        rabbitTemplate.convertAndSend(exchange, "", msg);
    }
}
3.6 测试
/**
 * 消息队列测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {

	@Autowired
	private OrderSender orderSender;

	@Test
	void contextLoads() {
	}

	@Test
	public void test1() throws InterruptedException {
		while (true){
			Thread.sleep(1000);
			orderSender.send("Hello RabbitMQ");
		}
	}

}

4.使用RabbbitMQ实现松耦合设计

4.1 修改配置文件
#设置应用的名称
spring.application.name=springcloud01-mq

#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi

#设置交换器的名称
mq.config.exchange=order.fanout

#短信队列名称
mq.config.queue.sms=order.sms

#push队列名称
mq.config.queue.push=order.push

#红包服务队列名称
mq.config.queue.red=order.red
4.2 添加receiver

RedReceiver

/**
 * Author: LuYi
 * Date: 2019/11/4 16:20
 * Description: 消息接收者
 *
 * @RabbitListener bindings:绑定队列
 *
 * @QueueBinding value:绑定队列名称
 *                exchange:配置交换器
 *
 * @Queue value:配置队列名称
 *         autoDelete:是否为可删除的临时队列
 *
 * @Exchange value:为交换器起名
 *            type:指定当前交换器的类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.red}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class RedReceiver {

    @RabbitHandler
    public void process(String msg){
        System.out.println("给用户发送10元红包     " + msg);
    }
}

九、RabbitMQ消息处理

1. RabbitMQ的消息持久化处理

1.1 创建项目

rabbitmq-direct-durable-provider

rabbitmq-direct-durable-consumer

1.2 autoDelete

@Queue:当所有消费客户端连接断开后,是否自动删除队列,如果设置成true表示删除,false表示不删除

@Exchange:当所有绑定队列都不在使用时,是否自动删除交换机,true:删除,false:不删除

2.RabbitMQ中的消息确认ACK机制

  • 什么是消息确认ACK

    如果在消费者接收消息是,服务器出现了异常导致这条消息无法被接收,这是RabbitMQ可以通过ACK机制保证消息不丢失

  • ACK的消息确认机制

    ACK机制是消费者接收到消息之后向RabbitMQ发送的,RabbitMQ收到ACK后才会将该消息从消息队列中删除

    • 如果RabbitMQ没有接收到消费者的ACK标识,那么RabbitMQ会将这个消息放入消息队列中再次发送。
    • 如果在集群情况下,RabbitMQ会将这个消息推送给在线的其他消费者。这种机制确保了消费者服务端在发生故障时不会丢失任何消息。
    • 消息永远不会从RabbitMQ中删除,只有当消费者正确发送了ACK并且RabbitMQ也做出了确认之后才会进行删除。
    • 消息的ACK确认机制是默认打开的。
  • ACK机制的开发注意事项

    如果忘记了ACK,那么当Consumer退出时,Message会一直重发,这样就会占用大量的RabbitMQ的内存,由于RabbitMQ会长时间运行,所以这个情况是致命的。