金石原创 |【深度挖掘RocketMQ底层源码】「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)

2,204 阅读5分钟

本文正在参加「金石计划」

承接上文

通过上一篇文章《【深度挖掘 RocketMQ 底层源码】「底层问题分析系列」深度挖掘 RocketMQ 底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]》,我们知道了对应的“[REJECTREQUEST]system busy, start flow control for a while”的主要原因。

回顾问题要点

再次追踪以下问题代码的未知,我们可以看到其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand。

上面的原理分析部分已经详细介绍其实现原理,总结如下:

主要由两种场景的考虑

  • 在不开启transientStorePoolEnable机制时,如果Broker PageCache繁忙时则抛出上述错误,判断PageCache繁忙的依据就是向PageCache追加消息时,如果持有锁的时间超过1s,则会抛出该错误。
  • 在开启transientStorePoolEnable机制时,其判断依据是如果TransientStorePool中不存在可用的堆外内存时抛出该错误。

本章内容方向

  • too many requests and system thread pool busy, RejectedExecutionException

  • [PC_SYNCHRONIZED]broker busy, start flow control for a while

  • [PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d

问题:too many requests and system thread pool busy, RejectedExecutionException

其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,其调用地方紧跟3.1,是在向线程池执行任务时,被线程池拒绝执行时抛出的,我们可以顺便看看Broker消息处理发送的线程信息:

BrokerController#registerProcessor

主要看的是队列的BlockedQueue的长度进行控制说明:

该线程池的队列长度默认为10000,我们可以通过sendThreadPoolQueueCapacity来改变默认值,如下图所示。

private int sendThreadPoolQueueCapacity = 10000;

[PC_SYNCHRONIZED]broker busy

其抛出的源码入口点:DefaultMessageStore#putMessage,在进行消息追加时,再一次判断PageCache是否繁忙,如果繁忙,则抛出上述错误。

    @Override
    public boolean isOSPageCacheBusy() {
        long begin = this.getCommitLog().getBeginTimeInLock();
        long diff = this.systemClock.now() - begin;
        return diff < 10000000
            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }
    @Override
    public long lockTimeMills() {
        return this.commitLog.lockTimeMills();
    }
    public SystemClock getSystemClock() {
        return systemClock;
    }
    public CommitLog getCommitLog() {
        return commitLog;
    }

上述的代码已经在上一章节已经完成了介绍,对应的服务处理和逻辑引发的问题,大家可以关注上一章的内容即可。

broker busy, period in queue: %sms, size of queue: %d

其源码的入口点

BrokerFastFailure#cleanExpiredRequest,该方法的调用频率为每隔10s中执行一次,不过有一个执行前提条件就是Broker端要开启快速失败,默认为开启,可以通过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔10s,检测一次,如果检测到PageCache繁忙,并且发送队列中还有排队的任务,则直接不再等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。

不靠谱得方案

消息发送时抛出system busy、broker busy的原因都是PageCache繁忙,那是不是可以通过调整上述提到的某些参数来避免抛出错误呢?例如,如下参数:

osPageCacheBusyTimeOutMills

设置PageCache系统超时的时间,默认为1000,表示1s,那是不是可以把增加这个值,例如设置为2000或3000。不推荐

sendThreadPoolQueueCapacity

Broker服务器处理的排队队列,默认为10000,如果队列中积压了10000个请求,则会抛出RejectExecutionException。不推荐

brokerFastFailureEnable

是否启用快速失败,默认为true,表示当如果发现Broker服务器的PageCache繁忙,如果发现sendThreadPoolQueue队列中不为空,表示还有排队的发送请求在排队等待执行,则直接结束等待,返回broker busy。那如果不开启快速失败,则同样可以避免抛出这个错误。不推荐

问题总结

修改上述参数,都不可取,原因是出现system busy、broker busy这个错误,其本质是系统的PageCache繁忙,通俗一点讲就是向PageCache追加消息时,单个消息发送占用的时间超过1s了,如果继续往该Broker服务器发送消息并等待,其TPS根本无法满足,哪还是高性能的消息中间了呀。

靠谱得方案

开启transientStorePoolEnable,在broker配置文件中将transientStorePoolEnable设置为true。这个方案在上一篇文章已经介绍过了,再次就不过多赘余。

它得问题重点是集中于

会增加数据丢失的可能性,如果Broker JVM进程异常退出,提交到PageCache中的消息是不会丢失的,但存在堆外内存(DirectByteBuffer)中但还未提交到PageCache中的这部分消息,将会丢失。但通常情况下,RocketMQ进程退出的可能性不大。

扩容Broker服务器

当Broker服务器自身比较忙的时候,才会采用快速失败机制,直接给消息发送者返回错误,消息发送者默认情况会重试2次,将消息发往其他Broker,保证其高可用,并且在接下来的一段时间内会规避该Broker,这样该Broker恢复提供了时间保证,Broker本身的架构是支持分布式水平扩容的,增加Topic的队列数,降低单台Broker服务器的负载,从而避免出现PageCache。

Broker扩容时候,可以复制集群中任意一台Broker服务下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服务器指定目录,避免在新Broker服务器上为Broker创建队列,然后,消息发送者、消息消费者都能动态获取Topic的路由信息

本文正在参加「金石计划」