RocketMQ实战-优雅的并发编程技巧

1,512 阅读5分钟

往期回顾

基于V4.9.4版本的RocketMQ源码解析专栏

RocketMQ实战-对于消息生产者使用的一些思考

RocketMQ实战-初探PushConsumer限流机制实现原理

上一篇文章着重分析了RocketMQ限流机制的实现原理,分别从客户端或者服务端各自不同的侧重点分开来讲述了一下。本篇文章,我们着重来分析梳理一下在并发编程上面,RocketMQ给我们带来了哪些优雅的实现方式,这其中蕴含着怎样的并发编程思想,我们能够从中学习借鉴到什么,这些疑问,在接下来的文章中将会重点描述。

本期重点内容

RocketMQ作为一款非常优秀的消息中间件,以其高性能、高吞吐量、低延迟著称。要实现这一系列的特性,在高并发编程上面的考虑以及设计肯定是少不了的,本篇我们选取其中几个经典的使用方式来梳理一下,看看RocketMQ是如何设计并使用的。

信号量semaphore使用

信号量有一个非常经典的使用场景,就是限流。通过信号量的使用继而可以控制并发度,在RocketMQ的使用场景就是消息的发送,在消息发送上面细分的话,就是涉及到异步发送和单向发送两个应用场景。通过信号量的使用,可以防止发送时产生过多的请求挂起,在一定程度上保护了客户端以及服务端的运行。

JDK semaphore使用示例

image.png

currentSemaphore.tryAcquire:该方法尝试在指定的时间内获取一个信号许可,如果当前没有可用的信号,则会返回false。

currentSemaphore.release():该方法为释放信号许可,前提是已经获取到了一个信号许可,若没有获取到许可的前提下仍然调用这个方法,则会引起许可的超发,会影响semaphore整体的流量控制。

由currentSemaphore.release()的方法分析可以看出,若release方法调用不当则会引起比较严重的超发问题,在上面的示例中,可以非常好的判断是否需要释放以及释放的时机,但是逻辑在复杂一点呢,条件在多一点呢,则可能会不好控制release的释放。

我们接下来看一下RocketMQ是如何使用Semaphore的,在复杂条件下是如何控制release的释放的

image.png

RocketMQ对semaphore做了一层的包装,新增了一个released的属性,通过这个属性的CAS机制来进行semaphore的release释放。

image.png

在调用的过程中,每个调用的线程栈都会持有一个SemaphoreReleaseOnlyOnce实例。

在这种组合的机制下,能够确保一个业务线程只会归还一次,不会出现超发的情况。

同步转异步编程技巧

image.png

在并发编程模型中,这是一个经典的并发设计模式-Future

Future模式指的是主线程向一个线程提交任务时返回一个Future,此时主线程不会阻塞,还可以做其他的事情,等需要异步执行结果时,调用Future的get方法。如果异步结果已经执行完成就立即获取结果,如果未执行完,则主线程阻塞等待执行结果。

一般情况下,Future模型通常需要一个线程池对象来进行配合实现,并且需要封装一个task请求。这种模式使用起来不是很方便。RocketMQ并没有使用Future的设计模式,而是采用了CountDownLatch来实现了同样的效果。

在业务线程调用invokeSyncImpl方法时,会首先构建一个ResponseFuture对象,这个对象的内部会封装一个CountDownLatch的对象实例。在调用完Netty的api后会调用waitResponse方法来阻塞等待响应结果。在给定的时间内等待Netty线程的唤醒。

image.png

waitResponse业务线程阻塞等待在给定的时间内

image.png

当客户端向服务端发送完请求并获取到响应后,由Netty的线程异步执行putResponse方法进行唤醒。

CompletableFuture的应用

image.png

CompletableFuture的应用在RocketMQ中主要体现在刷盘策略和同步策略上面,引入CompletableFuture,让实现真正的异步编程成为了可能。

image.png

以同步刷盘举例,当一条消息的写入请求到达服务端时,会调用SendMessageProcessor处理器的方法进行消息的写入,使用的线程是SendMessageProcessor处理器对应的线程,在调用同步刷盘方法时,会返回一个CompletableFuture对象,此时可能并没有完成数据的落盘,但是不会阻塞住当前线程,使得当前线程能够快速的返回,处理更多的写入请求,增加服务端的数据吞吐量。

这种优化思路引入书本中的解释为:将消息发送的处理逻辑与返回响应结果到客户端这两个步骤再次进行解耦,再使用一个线程池来处理同步刷盘或者同步复制的结果,然后在另外一个线程中将响应结果通过网络写入,这样SendMessageProcessor就减少了阻塞,提高了主线程的消息处理速度。

image.png

image.png

同步刷盘完成后,会调用complete,将结果通知,等同步策略也完成时,会将结果写回到客户端去,此时使用的线程就不在是SendMessageProcessor处理器对应的线程了。

本篇总结

本篇文章介绍了RocketMQ中间件对于并发编程在实现上的技巧以及一些优化思路,掌握这些技巧以及思路,对我们的工作以及学习也是非常有帮助的。