繁杂网络IO型业务的分析及探索--协程和响应式

1,895

最近Caffe在尝试优化业务中发现,服务的心跳信息中有很多线程都是处于waiting,如下图所示:

thread count="608" 
daemon-count="420" 
peak-count="611" 
total-started-count="13722" 
deadlocked="0" new="0" runnable="169" blocked="0" 
waiting="314"

然后看了CPU的使用率,从左到右分别表示CPU的任务等待数/CPU核数CPU的执行时间占比总时间(CPU执行时间+CPU空闲时间+ CPU等待时间)当前JAVA进程执行时间占比总时间

图中可以清晰地看到,并不是计算型业务导致了线程等待,而是极大可能由于服务到底层数据查询的网络IO等待使得排队的线程增加,因此决定考虑优化这一部分。优化的目标,在保证服务和底层存储的心跳信息在一个安全的范围内,尽可能的增加服务吞吐能力。

思路一协程

当时优化的第一时间就想到了大名鼎鼎的quasar三方库。quasar可以理解为轻量级的线程实现,熟悉go语言一定知道goroutine,我们知道Java语言中不支持协程,业务中很多场景都需要用线程池进行优化,但是使用线程池的成本也很高,无论是内存占用还是线程之间的切换消耗,都限制了一个应用不能无限制的创建线程。

好在社区开源了一款Java coroutine框架quasar,容我先吐槽一下,这个框架真的是直男程序员写的(已经被拉去写JDK的协程,十分期待JDK能早点支持协程),文档十分匮乏,导致我本地开始搞得时候就报错了一把,开局体验不是很舒服。

当然优点也十分突出,应用中网络IO耗时占比比较突出的场景中,使用quasar可以极大的提高CPU的吞吐率。简单描述就是可以在更短的时间内处理更多的请求。不会因为一个线程中的网络IO堵塞而让后面的线程处于waiting中,堵塞的时候CPU是不干活的,因此将整个系统的吞吐率拉胯。

官网的文档中提供了两种使用方式,为了节约篇幅先用第1种方式示范一下使用方式:

  1. Running the Instrumentation Java Agent(加载器织入)
  2. Ahead-of-Time (AOT) Instrumentation(预编译织入)

这里我先用Gradle项目作为🌰来详解一下怎么使用。

一、Gradle配置模块


configurations {
    quasar
}
//
tasks.withType(JavaExec) {
    jvmArgs "-javaagent:${configurations.quasar.iterator().next()}"
}
//
dependencies {
    compile "org.antlr:antlr4:4.7.2"
    compile "co.paralleluniverse:quasar-core:0.7.5"
    quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar"
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

二、实现一个耳熟能详的echo服务器

两个Fiber(相当于是Java的Thread)相互通信,increasing发送一个int数字给echoecho收到之后返回给increasingincreasing接收到echo返回的消息,先打印,在执行++操作,然后打印出最后的结果。代码示例如下:

  1. increasing
        final IntChannel increasingToEcho = Channels.newIntChannel(0);
        final IntChannel echoToIncreasing = Channels.newIntChannel(0);
        //
        Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
            @Override
            public Integer run() throws SuspendExecution, InterruptedException {
                int curr = 0;
                for (int i = 0; i < 10; ++i) {
                    Fiber.sleep(10);
                    System.out.println("INCREASING sending curr = " + curr);
                    increasingToEcho.send(curr);
                    curr = echoToIncreasing.receive();
                    System.out.println("INCREASING received curr = " + curr);
                    curr++;
                    System.out.println("INCREASING now curr = " + curr);
                }
                //
                System.out.println("INCREASING closing channel and exiting");
                increasingToEcho.close();
                return curr;
            }
        }).start();
  1. echo
        Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
            @Override
            public void run() throws SuspendExecution, InterruptedException {
                Integer curr;
                while(true) {
                    Fiber.sleep(1000);
                    curr = increasingToEcho.receive();
                    System.out.println("ECHO received curr = " + curr);
                    //
                    if (curr != null) {
                        System.out.println("ECHO sending curr = " + curr);
                        echoToIncreasing.send(curr);
                    } else {
                        System.out.println("ECHO 检测到关闭channel,closing and existing");
                        echoToIncreasing.close();
                        return;
                    }
                }
            }
        }).start();
  1. 运行increasingincreasing
        try {
            increasing.join();
            echo.join();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

可以看到,使用起来和Java中的Thread比较相似,API的语义清晰明了,减小了使用人员的成本。

三、原理及使用注意事项

1. Running the Instrumentation Java Agent

顾名思义,通过修改javaagent的方式,原理就是在classloading阶段动态的修改字节码。比如熟悉的AspectJ框架,核心就是ajc(编译器)织入器(weaver)达到不修改业务逻辑而修改字节码,ajc在java编译器的基础上,定义了一些aop语法,将符合这些语法的方法进行重新编译。分为预编译(CTW)加载期(LTW)后编译期(PTW) 3种织入方式。

quasarjvaagent就属于加载期(LTW) 织入的方式。同样也是在不影响正常编译的情况下,增加一些代码检测,当检测到某一个方法需要支持暂停功能的时候,进行重新编译,从而达到挂起方法保存上下文,阻塞完成后恢复执行下面的代码。

比如一个方法,在运行的时候需要请求网络,此时这个方法就被阻塞了,需要等网络请求返回在执行下面的代码。那么,当方法被阻塞的时候,就需要交给协程去控制,需要保存此时方法运行的上下文。当网络请求完成的时候,在执行方法的下文。粗略的概括一下协程的工作方式大致就是如此。

但是实际的场景会无比复杂。实际的一段阻塞的代码中,里面可会有多个阻塞的代码块,因此最顶端需要一个调度中心,只有当里面的阻塞代码块执行完了之后,才会执行外面的代码块,不然最后都乱套了。

这就很像java中的ForkJoinPool,一个大任务可以Fork出很多子任务,只有当子任务都完成执行,才会去执行父任务quasar也是如此,运行过程中将需要被挂起的方法和方法内的代码块交给调度中心,调度中心中存储任务之间的父子兄弟关系,然后按照任务层次关系执行代码。

1.1 quasar织入的条件

quasar会将满足下面条件的方法进行织入:

  1. 方法带有@Suspendable注解
  2. 方法抛出了异常SuspendExecution
  3. classpath下/META-INF/suspendables/META-INF/suspendable-supers指定了一些类或者接口,quasar会对这些类或者接口的方法进行分析,符合上面任意一种的方法进行织入
  4. 方法内部通过反射调用的方法,前后也会进行织入
  5. MethodHandle.invoke动态调用的方法前后进行织入
  6. JDK动态代理执行的代码块前后织入
  7. Java 8 lambdas调用前后织入

我们也可以从quasar的官网文档中看到依赖项,其中就有ASM — Java bytecode manipulation and analysis framework, by the ASM team,因此更多想了解织入的细节,大家可以去了解下ASM 框架。Caffe有空的时候也会单独出一篇文章科普下,因为这块的东西比较偏虚拟机底层。

1.2 quasar实际使用中需要对业务怎么改造?

如果业务中一个方法中有很多阻塞性业务,那么就要将这些阻塞性业务放入不同的Fiber执行,可以看到上文中的echoincreasing就属于两个阻塞型业务同时又相互依赖,逻辑上的依赖通过Channels解决。

1.3 兼容性问题

无论是通过javaagent还是AOT(预编译织入)的方式进行织入,本质上都是通过对字节码前后进行插入特定的指令。但是这种很容易带来一些兼容性问题,比如很多大厂都会通过pt-tracer这种染色技术,来对java线程进行着色,进行全链路的调用监控或者压测流量的区分。所以caffe思来想去就放弃了使用quasar这款伟大的协程框架,担心这种织入方式会不兼容线程中的染色

不过后面会尝试解决,毕竟quasar的性能让人看了不得不流口水。

思路二响应式编程

这块大家都应该很熟悉了吧,最出名的就数ReactiveX/RxJava,这款在android中最为被广泛使用,caffe在这使用RxJava3进行举例说明。

一、RxJava简介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻译过来就是使用事件驱动实现异步的一款响应式框架。事件驱动相信写过前端很熟悉,这也是node.js社区所吹嘘的高并发RxJava底层利用发布订阅模式(与node.js底层模式相似)并且支持线程切换来完成在有限的时间内支持更高的并发。

1.RxJava相关概念介绍

既然是发布订阅模式,那必不可少的三要素发布者订阅者事件类型

1.1 事件类型

主要分为下面3种事件类型:

  1. Next,发布者可以发布多个Next事件,订阅者也可以订阅多个Next事件;
  2. Complete,订阅者接收到Complete事件便不再订阅发布者的事件;
  3. Error,发布者发布Error事件之后,便不再发布事件。订阅者接受Error事件也不会继续订阅事件。
1.2 echo服务中的发布、订阅、事件

increasing充当发布者的角色,每隔一段时间向echo推送一个数字类型的消息。echo服务接收到消息之后打印出来。

increasing

        // 发布者发送事件
        Observable increasing = Observable.create((emitter) -> {
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(0);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(1);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onNext(2);
            Thread.sleep(new Random().nextInt(1000));
            emitter.onComplete();
        });

echo

          // 创建订阅者
          Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            //
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("观察者开始订阅");
                disposable = d;
            }
            //
            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println("观察者接受到消息: " + integer);
            }
            // 
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("观察者接收到报错: " + e.getMessage());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("观察者订阅完成,不再继续订阅消息");
            }
        };

echo订阅increasing

increasing.subscribe(echo);

可以看到最后的执行结果:

观察者开始订阅
观察者接受到消息: 0
观察者接受到消息: 1
观察者接受到消息: 2
观察者订阅完成,不再继续订阅消息

因此,在RxJava中,Observable扮演发布者,Observer扮演订阅者,ObservableOnSubscribe.subscribe方法来完成事件的发布。发布和订阅之间的关联是通过Observable.subscribe来完成的。

1.3 发布者和订阅者线程切换

上面的实例代码所有的发布者和订阅者的代码都是在一个主线程中进行的。但是实际的业务场景中需要将发布者的业务,和订阅者的业务用不同的线程去完成,以减小业务的耗时。

线程切换的代码如下:

increasing

            CountDownLatch latch = new CountDownLatch(3);
            // 发布者发送事件
            ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
            //
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                System.out.println("发布者开始发布事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(0);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(1);
                Thread.sleep(new Random().nextInt(1000));
                emitter.onNext(2);
                emitter.onComplete();
            }
          };
          Observable<Integer> increasing = Observable.create(onSubscribe);

echo

            // 订阅者接受事件
            Observer<Integer> echo = new Observer<Integer>() {
            private Disposable disposable;
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("订阅者开始订阅事件-" + Thread.currentThread().getName());
                disposable = d;
            }
            // 
            @Override
            public void onNext(@NonNull Integer integer) {
                latch.countDown();
                System.out.println("订阅者接收到事件-" + Thread.currentThread().getName() + "   " + integer);
            }
            //
            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("订阅者接收到报错,停止接受订阅事件-" + Thread.currentThread().getName());
            }
            //
            @Override
            public void onComplete() {
                System.out.println("订阅者接收到complete事件,停止接受订阅事件-" + Thread.currentThread().getName());
            }
        };

线程切换

        // 订阅者和发布者切换线程订阅
        increasing
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.trampoline())
                .subscribe(echo);
        //
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

最后的结果:

订阅者开始订阅事件-main
发布者开始发布事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
订阅者接收到事件-RxCachedThreadScheduler-1   0
订阅者接收到事件-RxCachedThreadScheduler-1   1
订阅者接收到事件-RxCachedThreadScheduler-1   2
订阅者接收到complete事件,停止接受订阅事件-RxCachedThreadScheduler-1

可以看到,发布者和订阅者分别在不同的线程中执行。其中Observable.subscribeOn(@NonNull Scheduler scheduler)是定义发布者方法执行的调度器,Observable。observeOn(@NonNull Scheduler scheduler)定义了订阅者方法的调度器。

调度器有很多种类别,比如IoSchedulerNewThreadSchedulerSingleSchedulerComputationScheduler等,需要根据不同的业务场景,合理的选择Scheduler

因此,需要更深层次的理解RxJava,就需要再去扒Scheduler的具体实现,这里caffe准备之后的文章中进行深度分析。

Caffe手写辛苦,麻烦各位大佬点赞关注留言,非常感谢你们的鼓励和支持。下周我们再见!