响应式编程 Reactor 学习小记

6,148 阅读13分钟

从响应式编程说起

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在 JVM 上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(Flow 类)。

响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

  • iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。
  • 响应式流中,相对应的角色是 Publisher-Subscriber,但是当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键

此外,对推送来的数据的操作是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过描述“控制流程”来定义对数据流的处理逻辑。

除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:

onNext x 0..N [onError | onComplete]

这种方式非常灵活,无论是有/没有值,还是 n 个值(包括有无限个值的流,比如时钟的持续读秒),都可处理。

以上来自 projectreactor.io/docs/core/r… 翻译

Reactive Streams

Reactive Streams 是上面提到的一套标准的响应式编程规范。它由四个核心概念构成:

  • 消息发布者:只有一个 subscribe 接口,是订阅者调用的,用来订阅发布者的消息。发布者在订阅者调用 request 之后把消息 push 给订阅者。
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    
  • 订阅者:订阅者包括四个接口,这些接口都由 Publisher 触发调用的。onSubscribe 告诉订阅者订阅成功,并返回了一个 Subscription ;通过 Subscription 订阅者可以告诉发布者发送指定数量的消息(request 完成) ;onNext 是发布者有消息时,调用订阅者这个接口来达到发布消息的目的;onError 通知订阅者,发布者出现了错误;onComplete 通知订阅者消息发送完毕。
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
  • 订阅:包括两个接口,请求 n 个消息和取消此次订阅。
    public interface Subscription {
        // request(n)用来发起请求数据,其中n表示请求数据的数量,它必须大于0,
        // 否则会抛出IllegalArgumentException,并触发onError,request的调用会
        // 累加,如果没有终止,最后会触发相应次数的onNext方法.
        public void request(long n);
        // cancel相当于取消订阅,调用之后,后续不会再收到订阅,onError 和 
        // onComplete也不会被触发
        public void cancel();
    }
    
  • 处理器:Processor 同时继承了 Subscriber 和 Publisher;其代表一个处理阶段。
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

Reactive Streams 通过上面的四个核心概念和相关的函数,对响应式流进行了一个框架性的约定,它没有具体实现。简单来说,它只提供通用的、合适的解决方案,大家都按照这个规约来实现就好了。

Java 的 Reactive Programming 类库主要有三个,分别是 Akka-Streams ,RxJava 和 Project Reactor。Spring 5 开始支持 Reactive Programming,其底层使用的是 Project Reactor。本篇主要是对 Project Reactor 中的一些点进行学习总结。

Project Reactor

Project Reactor 是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

Reactor 引入了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作方式。 一个 Flux 对象代表一个包含 0..N 个元素的响应式序列,而一个 Mono 对象代表一个包含零或者一个(0..1)元素的结果。

Flux 和 Mono

Flux 是生产者,即我们上面提到的 Publisher,它代表的是一个包含 0-N 个元素的异步序列,Mono可以看做 Flux 的有一个特例,代表 0-1 个元素,如果不需要生产任何元素,只是需要一个完成任务的信号,可以使用 Mono。

Flux-包含 0-N 个元素的异步序列

Flux

先来看这张图,这里是直接从官方文档上贴过来的。就这张图做下说明,先来关注几个点:

  • 从左到右的时间序列轴
  • 1-6 为 Flux enitted(发射)的元素
  • 上面 6 后面的竖线标识已经成功完成了
  • 下面的 1-3 表示转换的结果
  • ❌ 表示出现了error,对应的是执行了onError
  • operator : 操作符,声明式的可组装的响应式方法,其组装成的链称为“操作链”

那整体来看就是 Flux 产生元数据,通过一系列 operator 操作得到转换结果,正常成功就是 onCompleted,出现错误就是 onError。看下面的一个小例子:

Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription subscription) {
        // subscription 表示订阅关系
        System.out.println("onSubscribe,"+ subscription.getClass());
        // subscription 通过 request 来触发 onNext
        subscription.request(2);
    }
    @Override
    public void onNext(String s) {
        System.out.println("currrent value is = " + s);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("it's error.");
    }
    @Override
    public void onComplete() {
        System.out.println("it's completed.");
    }
});

执行结果:

onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed.

如果在 onSubscribe 方法中我们不执行 request,则不会有后续任何操作。关于 request 下面看。

Flux 是一个能够发出 0 到 N 个元素的标准的 Publisher,它会被一个 "error" 或 "completion" 信号终止。因此,一个 Flux 的结果可能是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNextonCompleteonError方法。

Mono-异步的 0-1 结果

Mono

这张图也来自官方文档,和上面 Flux 的区别就是,Mono 最多只能 emitted 一个元素。

Mono.just("glmapper").subscribe(System.out::println);

小结

通过上面两段小的代码来看,最直观的感受是,Flux 相当于一个 List,Mono 相当于 Optional。其实在编程中所有的结果我们都可以用 List 来 表示,但是当只返回一个或者没有结果时,用 Optional 可能会更精确些。

Optional 相关概念可自行搜索 jdk Optional

另外,Mono 和 Flux 都提供了一些工厂方法,用于创建相关的实例,这里简单罗列一下:

// 可以指定序列中包含的全部元素。创建出来的 Flux 
// 序列在发布这些元素之后会自动结束。
Flux.just("glmapper", "leishu");
// 从一个Iterable 对象中创建 Flux 对象,当然还可以是数组、Stream对象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 创建一个只包含错误消息的序列。
Flux.error(new IllegalStateException());
// 创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间
// 隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
Flux.interval(Duration.ofMillis(100)).take(10);
// 创建一个不包含任何消息通知的序列。
Flux.never();
// 创建一个不包含任何元素,只发布结束消息的序列。
Flux.empty(); 
// 创建包含从 start 起始的 count 个数量的 Integer 对象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());

上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。

一些概念

  • Operator:Operator 是一系列函数式的便捷操作,可以链式调用。所有函数调用基本都 是 Reactor 的 Operator ,比如 just,map,flatMap,filter 等。
  • Processor:上面从 Processor 的接口定义可以看出,它既是一个 Subscriber,又是一个 Publisher;Processor 夹在第一个 Publisher 和最后一个 Subscriber 中间,对数据进行处理。有点类似 stream 里的 map,filter 等方法。具体在数据流转中, Processor 以 Subscriber 的身份订阅 Publisher 接受数据,又以 Publisher 的方式接受其它 Subscriber 的订阅,它从自己订阅的 Publisher 收到数据后,做一些处理,然后转发给订阅它的 Subscriber。
  • back pressure:背压。对 MQ 有了解的应该清楚,消息积压一般是在消费端,也就是说生产端只负责生产,并不会关心消费端的消费能力,这样就到导致 pressure 积压在消费端,这个是正向的。从上面对 Reactor 中的一些了解,Subscriber 是主动向 Publisher 请求的,这样当消费端消费的速度没有生产者快时,这些消息还是积压在生产端;这种好处就是生产者可以根据实际情况适当的调整生产消息的速度。
  • Hot VS Cold :参考 Hot VS Cold

核心调用过程

Reactor 的核心调用过程大致可以分为图中的几个阶段

  • 声明:无论是使用 just 或者其他什么方式创建反应式流,这个过程都可以称之为声明,因为此时这些代码不会被实际的执行。
  • subscribe:当调用 subscribe 时,整个执行过程便进入 subscribe 阶段,经过一系列的调用之后,subscribe 动作会代理给具体的 Flux 来实现。
  • onSubscribe:onSubscribe 阶段指的是 Subscriber#onSubscribe 方法被依次调用的阶段。这个阶段会让各 Subscriber 知道 subscribe 方法已被触发,真正的处理流程马上就要开始。
  • request:onSubscribe 阶段是表示订阅动作的方式,让各 Subscriber 知悉,准备开始处理数据。当最终的 Subscriber 做好处理数据的准备之后,它便会调用 Subscription 的 request 方法请求数据。
  • onNext:通过调用 Subscriber 的 onNext 方法,进行真正的响应式的数据处理。
  • onComplete:成功的终端状态,没有进一步的事件将被发送。
  • onError:错误的终端状态(和 onComplete 一样,当发生时,后面的将不会在继续执行)。

消息处理

当需要处理 Flux 或 Mono 中的消息时,可以通过 subscribe 方法来添加相应的订阅逻辑。在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。

通过 subscribe() 方法处理正常和错误消息

 Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalStateException()))
    .subscribe(System.out::println, System.err::println);

结果:

1
2
java.lang.IllegalStateException

正常的消息处理相对简单。当出现错误时,有多种不同的处理策略:

  • 通过 onErrorReturn() 方法返回一个默认值

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);
    

    结果:

    1
    2
    0
    
  • 通过 onErrorResume()方法来根据不同的异常类型来选择要使用的产生元素的流

     Flux.just(1, 2)
            .concatWith(Mono.error(new IllegalArgumentException()))
            .onErrorResume(e -> {
                if (e instanceof IllegalStateException) {
                    return Mono.just(0);
                } else if (e instanceof IllegalArgumentException) {
                    return Mono.just(-1);
                }
                return Mono.empty();
                }).subscribe(System.out::println);
    

    结果:

    1
    2
    -1
    
  • 通过 retry 操作符来进行重试,重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);
    

    结果:

    1
    2
    1
    2
    Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
    Caused by: java.lang.IllegalStateException
    	at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
    	at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)
    

调度器 Scheduler

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler,Scheduler 是一个拥有广泛实现类的抽象接口,Schedulers 类提供的静态方法用于达成如下的执行环境:

  • 当前线程(Schedulers.immediate())
     Schedulers.immediate().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
     });
     
     // main-11
    
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()。
    Schedulers.single().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // single-1-11
    
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
    Schedulers.elastic().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // elastic-2-11
    
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同
    Schedulers.parallel().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // parallel-1-11
    
  • 基于现有的 ExecutorService 创建 Scheduler
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Schedulers.fromExecutorService(executorService).schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
            
    // pool-4-thread-1-11
    
  • 基于 newXXX 方法来创建调度器
    Schedulers.newElastic("test-elastic").schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // test-elastic-4-11
    

一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如, 通过工厂方法 Flux.interval(Duration.ofMillis(100)) 生成的每 100ms 打点一次的 Flux, 默认情况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single()

Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
        Schedulers.newSingle("test"))
        .map(i -> Thread.currentThread().getName() +"@"+i);
        intervalResult.subscribe(System.out::println);

结果:

test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略

publishOn 和 subscribeOn

Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。 它们都接受一个 Scheduler 作为参数,从而可以改变调度器。但是 publishOn 在链中出现的位置是有讲究的,而 subscribeOn 则无所谓。

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。
Flux.create(sink -> {
        sink.next(Thread.currentThread().getName());
        sink.complete();
    })
    .publishOn(Schedulers.single())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .publishOn(Schedulers.elastic())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .subscribeOn(Schedulers.parallel())
    .toStream()
    .forEach(System.out::println);

结果:

[elastic-2] [single-1] parallel-1

上面这段代码使用 create() 方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。

接着是两对 publishOn() 和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。

最后通过 subscribeOn()方法来改变流产生时的执行方式。

最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single() 调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。

先到这里,剩下的想到再补充...

参考