spring-messaging

1,109 阅读3分钟

spring-messaging是Spring Framework框架4.0推出的一个子模块,对消息的编程模型进行了统一。比如MessageMessageChannelMessageHandler,用来作为基于消息的应用的基础模块;内部还包含一组注解,比如@MessageMapping、@Payload和@Header,用于映射消息与方法之间的关系(类型Spring MVC请求和方法的映射编程模型)。

消息编程模型的统一

不论是Apache RocketMQ的Message,还是Apache Kafka的ProducerRecord,在spring-messaging中被统一称为org.springframework.messaging.Message接口

public interface Message<T> {
    // 用于获取消息体
    T getPayload();
    // 用于获取消息头 MessageHeaders是一个实现了java.util.Map<String,Object>接口的类
    MessageHeaders getHeaders();
}

消息(Message)是一个接口,具体的实现类有以下几种:

  • GenericMessage:普通消息,这是一个不可变(immutable)的消息,无法新增、修改和删除Header中的数据。
  • ErrorMessage:错误消息,如果Payload是一个Throwable异常,那么对应的消息就是错误消息。
  • MutableMessage:可变消息,跟GenericMessage的区别是可以新增、修改、删除Header中的数据

消息的发送和订阅

有了消息之后,需要把消息发送到Topic(Broker)里,这个Topic对应的编程模型是消息通道(MessageChannel)。

  • 调用MessageChannelsend方法可以将消息发送到这个MessageChannel
public interface MessageChannel {

   long INDEFINITE_TIMEOUT = -1;

   default boolean send(Message<?> message) {
      return send(message, INDEFINITE_TIMEOUT);
   }

   boolean send(Message<?> message, long timeout);

}
  • PollableChannel是一种以“拉”的方式获取消息通道,可以调用receive方法去拉取消息通道内的消息
public interface PollableChannel extends MessageChannel {

   @Nullable
   Message<?> receive();

   @Nullable
   Message<?> receive(long timeout);

}
  • SubscribableChannel是通过订阅的方式获取消息
public interface SubscribableChannel extends MessageChannel {

   boolean subscribe(MessageHandler handler);

   boolean unsubscribe(MessageHandler handler);

}
  • 消息通道拦截器(ChannelInterceptor)用于在MessageChannel发送消息前、发送消息后、发送消息完成时(发送异常也算完成)、消息接收前、消息接收后、接收完成时(发送异常也算完成)进行拦截

image.png

示例代码

@Slf4j 
public class MyPollableChannel implements PollableChannel { 
    private BlockingQueue queue = new ArrayBlockingQueue<>(1000);
    
    @Override
    public Message<?> receive() {
        return queue.poll();
    }

    @Override
    public Message<?> receive(long l) {
        try {
            return queue.poll(l, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("pollableChannel receive error", e);
        }
        return null;
    }

    @Override
    public boolean send(Message<?> message, long l) {
        return queue.add(message);
    }


    public static void main(String[] args) {
        MyPollableChannel channel = new MyPollableChannel();
        channel.send(MessageBuilder
                .withPayload("costom payload1")
                .setHeader("k1", "v1")
                .build());
        channel.send(MessageBuilder
                .withPayload("costom payload2")
                .setHeader("k2", "v2")
                .build());
        channel.send(MessageBuilder
                .withPayload("costom payload3")
                .setHeader("ingore", true)
                .build());
        System.out.println(channel.receive());
        System.out.println(channel.receive());
        System.out.println(channel.receive());
        System.out.println(channel.receive());
    }
}
public class MySubscribableChannel extends AbstractSubscribableChannel {

    private Random random = new Random();

    @Override
    protected boolean sendInternal(Message<?> message, long l) {
        if (message == null || CollectionUtils.isEmpty(getSubscribers())) {
            return false;
        }
        Iterator<MessageHandler> iterator = getSubscribers().iterator();

        int index = 0, targetIndex = random.nextInt(getSubscribers().size());
        while (iterator.hasNext()) {
            MessageHandler handler = iterator.next();
            if (index == targetIndex) {
                handler.handleMessage(message);
                return true;
            }
            index++;
        }

        return false;
    }

    public static void main(String[] args) {
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger failCount = new AtomicInteger(0);
        MySubscribableChannel channel = new MySubscribableChannel();

        channel.addInterceptor(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                // 拦截header ignore=true的消息
                String ignoreKey = "ignore";
                if (message.getHeaders().containsKey(ignoreKey) && message.getHeaders().get(ignoreKey, Boolean.class)) {
                    return null;
                }

                return message;
            }

            @Override
            public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
                if (sent) {
                    successCount.incrementAndGet();
                } else {
                    failCount.incrementAndGet();
                }

            }

        });
        // 发送正常消息,因此时没有订阅,本次发送无任何反馈,send返回值为false
        channel.send(MessageBuilder
                .withPayload("costom payload1")
                .setHeader("k1", "v1")
                .build());
        // 创建三个订阅
        channel.subscribe(msg -> {
            System.out.println("[" + Thread.currentThread().getName() + "] handler1 receive: " + msg);
        });
        channel.subscribe(msg -> {
            System.out.println("[" + Thread.currentThread().getName() + "] handler2 receive: " + msg);
        });
        channel.subscribe(msg -> {
            System.out.println("[" + Thread.currentThread().getName() + "] handler3 receive: " + msg);
        });
        // 发送正常消息,会随机匹配一个订阅被消费
        channel.send(MessageBuilder
                .withPayload("costom payload2")
                .setHeader("k1", "v1")
                .build());
        // 发送带ignore的消息,会被忽略
        channel.send(MessageBuilder
                .withPayload("costom payload2")
                .setHeader("ignore", true)
                .build());

        System.out.println("successCount:" + successCount.get() + "," + "failCount:" + failCount.get());
    }

}