消息队列学习基础

4,353 阅读15分钟

什么是MOM

MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。

MOM思想就是A和B两个应用程序不直接发送消息。之前A和B直接发送消息有很多效率问题,如A发送之后B没有及时接受,那么A就一直再在那里堵塞并发性不好,A必须等B接受完之后有返结果了A才可以结束。而MOM就是为了解决这样的问题,不让A与B之间交互,在A和B之间加一个消息中间件,A把消息放到消息中间上,就可以走了,去做别的事情,B什么时候来消息中间件取消息A不用知道也不用管。这样就提高了效率提供并发性,等B去走后可以通过状态,通知,回调等方式通知A就可以。市面上实现这种思想的技术有很多,IBM(MQSEVICES)、Microsoft(MSMQ)以及BEA的MessageMQ等。处于百家争鸣阶段都是各自实现各自的,没有统一实现标准。此时SUN为了实现统一标准就出现了JMS统一实现规范。JMS主要有2种消息模型,点到点和发布订阅两种。

什么是消息队列

消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个消息队列可以被一个也可以被多个消费者消费。

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

消息队列优点

  1. 将数据从一个应用程序传到另一个应用程序,或者从软件的一个模块传送到另外一个模块
  2. 负责建立网络通信的通道,进行数据的可靠传送
  3. 保证数据不重发,不丢失
  4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

消息队列的应用场景

下面详细介绍一下消息队列在实际应用中常用的使用场景。场景分为异步处理、应用解耦、流量削锋和消息通讯四个场景。

异步处理

场景说明 用户注册后,需要发送注册邮件和发送注册信息,传统的做法有两种:串行方式并行方式

串行方式

将注册信息写入数据库成功后,发送注册邮件,然后发送注册短信,而所有任务执行完成后,返回信息给客户端


并行方式

将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作。而所有任务执行完成后,返回信息给客户端。同串行方式相比,并行方式可以提高执行效率,减少执行时间。

上面的比较可以发现,假设三个操作均需要50ms的执行时间,排除网络因素,则最终执行完成,串行方式需要150ms,而并行方式需要100ms。

因为cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100此,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次。

由上可以看出,传统串行和并行的方式会受到系统性能的局限,那么如何解决这个问题?
我们需要引入消息队列,将不是必须的业务逻辑,异步进行处理,由此改造出来的流程为


根据上述的流程,用户的响应的时间基本相当于将数据写入数据库的时间,发送注册邮件,发送注册短信的消息在写入消息队列后,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,也有此可以将系统吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。

应用解耦

场景说明 用户下单后,订单系统需要通知库存系统。

传统的做法为:订单系统调用库存系统的接口。如下图所示:


传统方式具有如下缺点:
  1. 假设库存系统访问失败,则订单减少库存失败,导致订单创建失败
  2. 订单系统同库存系统过度耦合

如何解决上述的缺点呢?需要引入消息队列,引入消息队列后的架构如下图所示:


引入消息队列,实现应用解耦
  • 订单系统:用户下单后,订单系统进行数据持久化处理,然后将消息写入消息队列,返回订单创建成功
  • 库存系统:使用拉/推的方式,获取下单信息,库存系统根据订单信息,进行库存操作。

假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其后续操作了。由此实现了订单系统与库存系统的应用解耦。

流量削锋

流量削峰 也是消息对列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景 秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  1. 可以控制参与活动的人数;
  2. 可以缓解短时间内高流量对应用的巨大压力;

流量削锋处理方式系统图如下:


流量削锋方式系统图
  1. 服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面;
  2. 秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:


消息队列应用于日志处理的架构
  • 日志采集客户端:负责日志数据采集,定时写受写入Kafka队列;
  • Kafka消息队列:负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

这种架构在实际开发中的应用,可以参照案例:新浪技术分享:我们如何扛下32亿条实时日志的分析处理

服务的技术架构设计

  1. Kafka:接收用户日志的消息队列。
  2. Logstash:做日志解析,统一成JSON输出给Elasticsearch。
  3. Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
  4. Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列、聊天室等。

点对点通讯

点对点通讯架构设计

在点对点通讯架构设计中,客户端A和客户端B共用一个消息队列,即可实现消息通讯功能。

聊天室通讯

聊天室通讯架构设计

客户端A、客户端B、直至客户端N订阅同一消息队列,进行消息的发布与接收,即可实现聊天通讯方案架构设计。

JMS消息服务

讲消息队列就不得不提JMS。JMS(Java Message Service,Java消息服务) JMS 叫做 Java 消息服务(Java Message Service),是 Java 平台上有关面向 MOM 的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,类似于 JDBC 和关系型数据库通信方式的抽象。

API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建,发送,接收和读取消息。他是分布式通信耦合度更低,消息服务更加可靠以及异步性。

在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。


常用概念

  • Provider:纯 Java 语言编写的 JMS 接口实现(比如 ActiveMQ 就是)
  • Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种
  • Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接
  • Destination:消息被寻址、发送以及接收的对象

消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)

P2P 模式


P2P(点对点)模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,知道他们被消费或者超时。

P2P 消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。

多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。


P2P的特点

  1. 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中,其他的消费者就不能得到这条消息了。)
  2. 发送者和接收者质检在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,他不会影响到消息发送到队列。
  3. 消费者必须确认对消息的接收

    收到消息后消费者必须确认消息已被接收,否则JMS服务提供者会认为该消息没有被接收,那么这条消息仍然可以被其他人接收。程序可以自动进行确认,不需要人工干预。

  4. 非持久的消息最多只发送一次

    非持久的消息最多只发送一次,表示消息有可能未被发送,造成未被发送的原因可能有:

    1、 JMS服务提供者出现宕机等情况,造成非持久信息的丢失

    2、 队列中的消息过期,未被接收

  5.  持久的消息严格发送一次

    我们可以将比较重要的消息设置为持久化的消息,持久化后的消息不会因为JMS服务提供者的故障或者其他原因造成消息丢失。

如果希望发送的每个消息都会被成功处理的话,那么需要p2p 模式

Pub/Sub模式



包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。

除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。

Pub/Sub的特点

  • 每个消息都可以有多个(0,1,……)订阅者,每条消息可以有多个消费者,如果报纸和杂志一样,谁订阅了谁都可以获得。

  • 发布者和订阅者之间有时间上的依赖性。订阅者只能消费他们订阅之后出版的消息,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。这就要求订阅者必须先订阅,生产者再发布。即订阅者必须先运行,再等待生产者的运行,这和点对点类型有所差异。
  • 为了消费消息,订阅者必须保持运行的状态。即订阅者必须保持活动状态等待发布者发布的消息,如果订阅者在发布者发布消息之后才运行,则不能获得先前发布者发布的消息。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来小消费消息。

  1. 同步
    订阅者或接收者通过receive方法来接受消息,receive在接收到消息之前(或超时之前)将一直阻塞。
  2. 异步
    订阅者或接收者亦可以注册未一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage的方法。
JDNI:Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或者命名服务中的一个记录,同时返回资源连接建立所必需的信息。

JNDI在JMS中起到查找二号访问发送目标或消息来源的作用。

JMS编程

JMS通用步骤

  • 获取连接工厂
  • 使用连接工厂创建连接
  • 启动连接
  • 从连接创建会话
  • 获取 Destination
  • 创建 Producer,或
    • 创建 Producer
    • 创建 message
  • 创建 Consumer,或发送或接收message发送或接收 message
    • 创建 Consumer
    • 注册消息监听器(可选)
  • 发送或接收 message
  • 关闭资源(connection, session, producer, consumer 等)

JMS编程模型

1.ConnectionFactory

创建Connection对象的工厂,针对两周不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

2.Destination

Destination的意思是消息生产者的消息发送目标或着说消息消费者的消息来源。对于消息生产者来说。他的Destination是某个队列(queue)或者某个主题(Topic);对于消息消费者来说,他的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue,Topic可以通过JNDI来查找Destination

3.Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP Socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

4.Session

Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

5.消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

6.消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

7. MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

深入学习JMS对掌握JAVA架构、EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。

JMS编程实战

这里拿ActiveMQ 举例

public class JMSDemo {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        MessageConsumer consumer;
        Message message;
        boolean useTransaction = false;
        try {
                Context ctx = new InitialContext();
                connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
                //使用ActiveMQ时:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TEST.QUEUE");
                //生产者发送消息
                producer = session.createProducer(destination);
                message = session.createTextMessage("this is a test");

                //消费者同步接收
                consumer = session.createConsumer(destination);
                message = (TextMessage) consumer.receive(1000);
                System.out.println("Received message: " + message);
                //消费者异步接收
                consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                                if (message != null) {
                                        doMessageEvent(message);
                                }
                        }
                });
        } catch (JMSException e) {
                ...
        } finally {
                producer.close();
                session.close();
                connection.close();
        }
}