spring-messaging是Spring Framework框架4.0推出的一个子模块,对消息的编程模型进行了统一。比如
Message
、MessageChannel
和MessageHandler
,用来作为基于消息的应用的基础模块;内部还包含一组注解,比如@MessageMapping、@Payload和@Header,用于映射消息与方法之间的关系(类型Spring MVC请求和方法的映射编程模型)。
消息编程模型的统一
不论是Apache RocketMQ的
Message
,还是Apache Kafka的ProducerRecord
,在spring-messaging中被统一称为org.springframework.messaging.Message
接口
public interface Message<T> {
// 用于获取消息体
T getPayload();
// 用于获取消息头 MessageHeaders是一个实现了java.util.Map<String,Object>接口的类
MessageHeaders getHeaders();
}
消息(Message)是一个接口,具体的实现类有以下几种:
GenericMessage
:普通消息,这是一个不可变(immutable)的消息,无法新增、修改和删除Header中的数据。ErrorMessage
:错误消息,如果Payload
是一个Throwable
异常,那么对应的消息就是错误消息。MutableMessage
:可变消息,跟GenericMessage
的区别是可以新增、修改、删除Header
中的数据
消息的发送和订阅
有了消息之后,需要把消息发送到
Topic(Broker)
里,这个Topic对应的编程模型是消息通道(MessageChannel
)。
- 调用
MessageChannel
的send
方法可以将消息发送到这个MessageChannel
中
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
PollableChannel
是一种以“拉”的方式获取消息通道,可以调用receive
方法去拉取消息通道内的消息
public interface PollableChannel extends MessageChannel {
@Nullable
Message<?> receive();
@Nullable
Message<?> receive(long timeout);
}
SubscribableChannel
是通过订阅的方式获取消息
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
- 消息通道拦截器(
ChannelInterceptor
)用于在MessageChannel发送消息前、发送消息后、发送消息完成时(发送异常也算完成)、消息接收前、消息接收后、接收完成时(发送异常也算完成)进行拦截
示例代码
@Slf4j
public class MyPollableChannel implements PollableChannel {
private BlockingQueue queue = new ArrayBlockingQueue<>(1000);
@Override
public Message<?> receive() {
return queue.poll();
}
@Override
public Message<?> receive(long l) {
try {
return queue.poll(l, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("pollableChannel receive error", e);
}
return null;
}
@Override
public boolean send(Message<?> message, long l) {
return queue.add(message);
}
public static void main(String[] args) {
MyPollableChannel channel = new MyPollableChannel();
channel.send(MessageBuilder
.withPayload("costom payload1")
.setHeader("k1", "v1")
.build());
channel.send(MessageBuilder
.withPayload("costom payload2")
.setHeader("k2", "v2")
.build());
channel.send(MessageBuilder
.withPayload("costom payload3")
.setHeader("ingore", true)
.build());
System.out.println(channel.receive());
System.out.println(channel.receive());
System.out.println(channel.receive());
System.out.println(channel.receive());
}
}
public class MySubscribableChannel extends AbstractSubscribableChannel {
private Random random = new Random();
@Override
protected boolean sendInternal(Message<?> message, long l) {
if (message == null || CollectionUtils.isEmpty(getSubscribers())) {
return false;
}
Iterator<MessageHandler> iterator = getSubscribers().iterator();
int index = 0, targetIndex = random.nextInt(getSubscribers().size());
while (iterator.hasNext()) {
MessageHandler handler = iterator.next();
if (index == targetIndex) {
handler.handleMessage(message);
return true;
}
index++;
}
return false;
}
public static void main(String[] args) {
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
MySubscribableChannel channel = new MySubscribableChannel();
channel.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// 拦截header ignore=true的消息
String ignoreKey = "ignore";
if (message.getHeaders().containsKey(ignoreKey) && message.getHeaders().get(ignoreKey, Boolean.class)) {
return null;
}
return message;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
if (sent) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
}
});
// 发送正常消息,因此时没有订阅,本次发送无任何反馈,send返回值为false
channel.send(MessageBuilder
.withPayload("costom payload1")
.setHeader("k1", "v1")
.build());
// 创建三个订阅
channel.subscribe(msg -> {
System.out.println("[" + Thread.currentThread().getName() + "] handler1 receive: " + msg);
});
channel.subscribe(msg -> {
System.out.println("[" + Thread.currentThread().getName() + "] handler2 receive: " + msg);
});
channel.subscribe(msg -> {
System.out.println("[" + Thread.currentThread().getName() + "] handler3 receive: " + msg);
});
// 发送正常消息,会随机匹配一个订阅被消费
channel.send(MessageBuilder
.withPayload("costom payload2")
.setHeader("k1", "v1")
.build());
// 发送带ignore的消息,会被忽略
channel.send(MessageBuilder
.withPayload("costom payload2")
.setHeader("ignore", true)
.build());
System.out.println("successCount:" + successCount.get() + "," + "failCount:" + failCount.get());
}
}