阅读 15

ActiveMQ

简介及安装

官网地址
下载下来直接解压,进入bin目录,执行命令./activemq start即可启动,以下是相关目录结构说明

  • bin存放的是脚本文件
  • conf存放的是基本配置文件
  • data存放的是日志文件
  • docs存放的是说明文档
  • examples存放的是简单的实例
  • lib存放的是activemq所需jar包
  • webapps用于存放项目的目录\

默认的服务端口为61616,不过该端口可以在conf目录下的activemq.xml中进行修改,找到transportConnectors标签,修改openwire中的端口即可。
监控平台默认的端口为8161,如果要修改该端口,可以修改conf/jetty.xml文件中的jetty启动端口。默认的用户名和密码为admin/admin,user/user,如果要修改用户名和密码,则在conf/jetty-realm.properties中进行修改即可,格式为[用户名:密码,角色名],关于web管理界面部分列说明如下:

  • Number Of Consumers:这个是消费者端的消费者数量
  • Number Of Pending Messages:等待消费的消息 “这个是当前未出队列的数量。可以理解为总接收数-总出队列数”
  • Messages Enqueued:进入队列的消息 “进入队列的总数量,包括出队列的。 这个数量只增不减”
  • Messages Dequeued:出了队列的消息 “可以理解为是消费这消费掉的数量”

springBoot集成

  1. 引入jar包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
复制代码
  1. 配置文件
# MQ地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617)
# 在考虑结束之前等待的时间
#spring.activemq.close-timeout=15s
# 是否启用内存模式
spring.activemq.in-memory=true 
# 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置,消费端使用
spring.jms.pub-sub-domain=true
#账号
spring.activemq.user=admin
# 密码
spring.activemq.password=admin
复制代码
  1. 消息发送端代码,我将消息生成和目的地解耦了,这样更有利于扩展
@Service("producer")
public class Producer {
	@Autowired
	private JmsMessagingTemplate jmsTemplate;
	
	public void sendMessage(Destination destination, final Object msg){
		jmsTemplate.convertAndSend(destination, msg);
	}
}
@Configuration
public class DestinationConfig {

	@Bean(name = "testTopic")
	public Topic getTestTopic(){
		return new ActiveMQTopic("topic.test");
	}
	@Bean(name = "testQueue")
	public Queue getTestQueue(){
		return new ActiveMQQueue("queue.test");
	}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
	@Autowired
	private Producer producer;

	@Resource(name = "testTopic")
	private Topic testTopic;
	@Resource(name = "testQueue")
	private Queue testQueue;

	@Test
	public void sendTopicMsg(){
		producer.sendMessage(testTopic, "This is test Topic");
	}

	@Test
	public void sendQueueMsg(){
		producer.sendMessage(testQueue, "This is test Queue");
	}

}
复制代码
  1. 消费端代码
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
 * containerFactory配置请参考第5点说明
 * 该配置用来指明该destination是队列还是主题消息
 */
@Component
public class Consumer {
	@JmsListener(destination = "topic.test", containerFactory = "jmsListenerContainerTopic")
	public void recevieTopicMsg(String msg){
		System.out.println("接收的主题消息为:"+msg);
	}

	@JmsListener(destination = "queue.test", containerFactory = "jmsListenerContainerQueue")
	//@SendTo("otherTopic") // 表示将方法的返回值发送到另外的队列,含有这个注解的话,则方法需要返回值
	public void recevieQueueMsg(String msg){
		System.out.println("接收的队列消息为:"+msg);
	}
}
复制代码
  1. 如果项目既需要监听topic,也需要监听queue,则需要单独配置,代码如下
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class MqConfig {
	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
		DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
		beanFactory.setPubSubDomain(true);
		beanFactory.setConnectionFactory(connectionFactory);
		return beanFactory;
	}

	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
		DefaultJmsListenerContainerFactory beanFactory = new DefaultJmsListenerContainerFactory();
		beanFactory.setConnectionFactory(connectionFactory);
		return beanFactory;
	}
}
复制代码
  1. 在实际应用中,我们发送消息通常希望知道消息是否发送成功,下面代码我通过线程池发送消息,并且返回发送是否成功信息,仅供参考,如果有更好方式,欢迎留言讨论交流。
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

import javax.annotation.Resource;
import javax.jms.*;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.ProducerCallback;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class.getName());

    private static ExecutorService pool = Executors.newCachedThreadPool();

    @Autowired
    private JmsTemplate jmsTemplate;

    @Resource(name = "testTopic")
    private Topic testTopic;
    public void sendTechStatRefresh(String json) {
        try {
            sendMsgResultNotification(sendTopicMsg(testTopic, json), "主题消息测试", json);
        } catch (Throwable throwable) {
            LOGGER.error(throwable.getMessage(), throwable);
        }
    }

    @Resource(name = "testQueue")
    private Queue testQueue;
    public void sendLotteryMatchScore(String msg) {
        try {
            sendMsgResultNotification(sendQueueMsg(testQueue, msg), "队列消息测试", msg);
        } catch (Throwable throwable) {
            LOGGER.error(throwable.getMessage(), throwable);
        }
    }

    private boolean sendTopicMsg(final Topic topic, final String msg) {
        if (topic == null || StringUtils.isBlank(msg)) {
            throw new NullPointerException("topic is null or msg is null");
        }
        try {
            return pool.submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    jmsTemplate.convertAndSend(topic, msg);
                    return Boolean.TRUE;
                }
            }).get();
        } catch (Throwable throwable) {
            try {
                LOGGER.error("send msg to topic error:topic = " + topic.getTopicName() + " msg = " + msg.toString(), throwable);
            } catch (Exception e) {
                LOGGER.error("发送主题消息异常", e);
            }
        }
        return Boolean.FALSE;
    }

    private boolean sendQueueMsg(final Queue queue,final String msg) {
        if (queue == null || StringUtils.isBlank(msg)) {
            throw new NullPointerException("topic is null or msg is null");
        }

        try {
            return pool.submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    jmsTemplate.convertAndSend(queue, msg);
                    return Boolean.TRUE;
                }
            }).get();
        } catch (Throwable throwable) {
            try {
                LOGGER.error("send msg to queue error:queue = " + queue.getQueueName() + " msg = " + msg.toString(),
                        throwable);
            } catch (Exception e) {
                LOGGER.error("发送队列消息异常", e);
            }
        }
        return Boolean.FALSE;
    }

    private void sendMsgResultNotification(final boolean rs, final String msgType, final String data) {
        if (rs) {
            LOGGER.debug("推送 " + msgType + "成功 : 消息[" + data + "]");
        }
    }
}
复制代码

AQMP协议

  • AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等;
  • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义;
  • 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成;
  • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息。
  • Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中。

STOMP协议

STOMP是一个简单的可互操作的协议, 被用于通过中间服务器在客户端之间进行异步消息传递。它定义了一种在客户端与服务端进行消息传递的文本格式。 STOMP是基于帧的协议,它假定底层为一个2-way的可靠流的网络协议(如TCP)。客户端和服务器通信使用STOMP帧流通讯。

参考文档:blog.csdn.net/u012758088/…

关注下面的标签,发现更多相似文章
评论