跟我学RocketMQ之批量消息发送源码解析

1,477 阅读5分钟

上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

    @Override
    public SendResult send(
        Collection<Message> msgs) throws MQClientException, RemotingException, 
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs));
    }

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

    @Override
    public SendResult send(Collection<Message> msgs,
        long timeout) throws MQClientException, RemotingException,
             MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs), timeout);
    }

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

DefaultProducer.batch

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        // 声明批量消息体
        MessageBatch msgBatch;
        try {
            // 从Message的list生成批量消息体MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 设置消息体,此时的消息体已经是处理过后的批量消息体
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 设置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

从代码可以看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,我们看下MessageBatch的声明

    public class MessageBatch extends Message implements Iterable<Message> {
        private final List<Message> messages;
        private MessageBatch(List<Message> messages) {
            this.messages = messages;
        }

可以看到MessageBatch继承自Message,持有List引用。

我们接着看一下generateFromList方法

MessageBatch.generateFromList

    public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;
        // 首先实例化一个Message的list
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;
        // 对messages集合进行遍历
        for (Message message : messages) {
            // 判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException
                    ("TimeDelayLevel in not supported for batching");
            }
            // 判断topic是否以 **"%RETRY%"** 开头,如果是,
            // 则抛出异常,原因为:批量发送消息不支持消息重试
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }
            // 判断集合中的每个Message的topic与批量发送topic是否一致,
            // 如果不一致则抛出异常,原因为:
            // 批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }
                // 判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,
                // 如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }
            // 校验通过后,将message实体添加到messageList中
            messageList.add(message);
        }
        // 将处理完成的messageList作为构造方法,
        // 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。
        MessageBatch messageBatch = new MessageBatch(messageList);
        messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }

总结一下,generateFromList方法对调用方设置的Collection集合进行遍历,经过前置校验之后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,我们接着看DefaultProducer.batch的逻辑。

到此,通过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。

DefaultProducer.batch

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        // 声明批量消息体
        MessageBatch msgBatch;
        try {
            // 从Message的list生成批量消息体MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 设置消息体,此时的消息体已经是处理过后的批量消息体
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 设置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

注意下面这行代码:

        // 设置消息体,此时的消息体已经是处理过后的批量消息体
        msgBatch.setBody(msgBatch.encode());

这里对MessageBatch进行消息编码处理,通过调用MessageBatch的encode方法实现,代码逻辑如下:

    public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }

可以看到是通过静态方法 encodeMessages(List messages) 实现的。

我们看一下encodeMessages方法的逻辑:

    public static byte[] encodeMessages(List<Message> messages) {
        //TO DO refactor, accumulate in one buffer, avoid copies
        List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
        int allSize = 0;
        for (Message message : messages) {
            // 遍历messages集合,分别对每个Message实体进行编码操作,转换为byte[]
            byte[] tmp = encodeMessage(message);
            // 将转换后的单个Message的byte[]设置到encodedMessages中
            encodedMessages.add(tmp);
            // 批量消息的二进制数据长度随实际消息体递增
            allSize += tmp.length;
        }
        byte[] allBytes = new byte[allSize];
        int pos = 0;
        for (byte[] bytes : encodedMessages) {
            // 遍历encodedMessages,按序复制每个Message的二进制格式消息体
            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
            pos += bytes.length;
        }
        // 返回批量消息整体的消息体二进制数组
        return allBytes;
    }

encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每个Message实体的二进制数组格式消息体并返回。

我们可以继续看一下单个Message是如何进行编码的,调用了 MessageDecoder.encodeMessage(message) 方法,逻辑如下:

    public static byte[] encodeMessage(Message message) {
        //only need flag, body, properties
        byte[] body = message.getBody();
        int bodyLen = body.length;
        String properties = messageProperties2String(message.getProperties());
        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
        //note properties length must not more than Short.MAX
        short propertiesLength = (short) propertiesBytes.length;
        int sysFlag = message.getFlag();
        int storeSize = 4 // 1 TOTALSIZE
            + 4 // 2 MAGICCOD
            + 4 // 3 BODYCRC
            + 4 // 4 FLAG
            + 4 + bodyLen // 4 BODY
            + 2 + propertiesLength;
        ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
        // 1 TOTALSIZE
        byteBuffer.putInt(storeSize);
        // 2 MAGICCODE
        byteBuffer.putInt(0);
        // 3 BODYCRC
        byteBuffer.putInt(0);
        // 4 FLAG
        int flag = message.getFlag();
        byteBuffer.putInt(flag);
        // 5 BODY
        byteBuffer.putInt(bodyLen);
        byteBuffer.put(body);
        // 6 properties
        byteBuffer.putShort(propertiesLength);
        byteBuffer.put(propertiesBytes);
        return byteBuffer.array();
    }

这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:

    消息总长度          ---  4字节
    魔数                --- 4字节
    bodyCRC校验码       --- 4字节
    flag标识            --- 4字节
    body长度            --- 4字节
    消息体              --- 消息体实际长度N字节
    属性长度            --- 2字节
    扩展属性            --- N字节

通过encodeMessage方法处理之后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。

其他

到此便是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于我们进行理解学习。

后续的发送流程这里就不再重复展开了,感兴趣的同学可以移步我们的上一篇文章查看

跟我学RocketMQ之消息发送源码解析

批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。


版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。