Spring 5 响应式编程

5,034 阅读26分钟

要点

  • Reactor 是一个运行在 Java8 之上的响应式流框架,它提供了一组响应式风格的 API
  • 除了个别 API 上的区别,它的原理跟 RxJava 很相似
  • 它是第四代响应式框架,支持操作融合,类似 RxJava 2
  • Spring 5 的响应式编程模型主要依赖 Reactor

RxJava 回顾

Reactor 是第四代响应式框架,跟RxJava 2 有些相似。Reactor 项目由Pivotal 启动,以响应式流规范、Java8 和ReactiveX 术语表为基础。它的设计是Reactor 2(上一个主要版本)和RxJava 核心贡献者共同努力的结果。

在之前的同系列文章 RxJava 实例解析测试RxJava 里,我们已经了解了响应式编程的基础:数据流的概念、Observable 类和它的各种操作以及通过工厂方法创建静态和动态的Observable 对象。

Observable 是事件的源头,Observer 提供了一组简单的接口,并通过订阅事件源来消费 Observable 的事件。Observable 通过 onNext 向 Observer 通知事件的到达,后面可能会跟上 onError 或 onComplete 来表示事件的结束。

RxJava 提供了 TestSubscriber 来测试 Observable,TestSubscriber 是一个特别的 Observer,可以用它断言流事件。

在这篇文章里,我们将会对 ReactorRxJava 进行比较,包括它们的相同点和不同点。

Reactor 的类型

Reactor 有两种类型, FluxMono

  • Flux 类似 RaxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。
  • Mono 最多只触发一个事件,它跟 RxJava 的 Single 和 Maybe 类似,所以可以把 Mono 用于在异步任务完成时发出通知。

因为这两种类型之间的简单区别,我们可以很容易地区分响应式 API 的类型:从返回的类型我们就可以知道一个方法会发射并忘记请求并等待(Mono),还是在处理一个包含多个数据项的流(Flux)。

FluxMono 的一些操作利用了这个特点在这两种类型间互相转换。例如,调用 Flux 的 single() 方法将返回一个 Mono,而使用 concatWith() 方法把两个 Mono 串在一起就可以得到一个 Flux。类似地,有些操作对 Mono 来说毫无意义(例如 take(n) 会得到 n>1 的结果),而有些操作只有作用在 Mono 上才有意义(例如 or(otherMono))。

Reactor 设计的原则之一是要保持 API 的精简,而对这两种响应式类型的分离,是表现力与 API 易用性之间的折中。

使用响应式流,基于 Rx 构建

正如RxJava 实例解析里所说的,从设计概念方面来看,RxJava 有点类似 Java 8 Streams API。而 Reactor 看起来有点像 RxJava,不过这决不只是个巧合。这样的设计是为了能够给复杂的异步逻辑提供一套原生的具有 Rx 操作风格的响应式流 API。所以说 Reactor 扎根于响应式流,同时在 API 方面尽可能地与 RxJava 靠拢。

响应式类库和响应式流的使用

Reactive Streams (以下简称为 RS)是一种规范,它为基于非阻塞回压的异步流处理提供了标准。它是一组包含了 TCK 工具套件和四个简单接口(Publisher、Subscriber、Subscription 和 Processor)的规范,这些接口将被集成到 Java 9.

RS 主要跟响应式回压(稍后会详细介绍)以及多个响应式事件源之间的交互操作有关。它并不提供任何操作方法,它只关注流的生命周期。

Reactor 不同于其它框架的最关键一点就是 RS。Flux 和 Mono 这两者都是 RS 的 Publisher 实现,它们都具备了响应式回压的特点。

RxJava 1 里,只有少部分操作支持回压,RxJava 1 的 Observable 并没有实现 RS 里的任何类型,不过它有一些 RS 类型的适配器。可以说,RxJava 1 实际上比 RS 规范出现得更早,而且在 RS 规范设计期间,RxJava 1 充当了函数式工作者的角色。

所以,你在使用那些 Publisher 适配器时,它们并不会为你提供任何操作。为了能做一些有用的操作,你可能需要用回 Observable,而这个时候你需要另一个适配器。这种视觉上的混乱会破坏代码的可读性,特别是像 Spring 5 这样的框架,如果整个框架建立在这样的 Publisher 之上,那么就更是杂乱不堪。

RS 规范不支持 null 值,所以在从 RxJava 1 迁移到 Reactor 或 RxJava 2 时要注意这点。如果你在代码里把 null 用作特殊用途,那么就更是要注意了。

RxJava 2 是在 RS 规范之后出现的,所以它直接在 Flowable 类型里实现了 Publisher。不过除了 RS 类型,RxJava 2 还保留了 RxJava 1 的遗留类型(Observable、Completable 和 Single)并且引入了其它一些可选类型——Maybe。这些类型提供了不同的语义,不过它们并没有实现 RS 接口,这是它们的不足之处。跟 RxJava 1 不一样,RxJava 2 的 Observable 不支持 RxJava 2 的回压协议(只有 Flowable 具备这个特性)。之所以这样设计是为了能够为一些场景提供一组丰富且流畅的 API,比如用户界面发出的事件,在这样的场景里是不需要用到回压的,而且也不可能用到。Completable、Single 和 Maybe 不需要支持回压,不过它们也提供了一组丰富的 API,而且在被订阅之前不会做任何事情。

在响应式领域,Reactor 变得愈加精益,它的 Mono 和 Flux 两种类型都实现了 Publisher,并且都支持回压。虽然把 Mono 作为一个 Publisher 需要付出一些额外的开销,不过 Mono 在其它方面的优势弥补了它的缺点。在后续部分我们将看到对 Mono 来说回压意味着什么。

相比 RxJava,API 相似但不相同

ReactiveX 和 RxJava 的操作术语表有时候真的难以掌握,因为历史原因,有些操作的名字让人感到困惑。Reactor 尽量把 API 设计得紧凑,在给 API 取名时尽量选择好一点的名字,不过总的来说,这两套 API 看起来还是很相像。在最新的 RxJava 2 迭代版本中,RxJava 2 借鉴了 Reactor 的一些术语,这预示着这两个项目之间可能会有越来越紧密的合作。一些操作和概念总是先出现在其中的一个项目里,然后互相借鉴,最后会同时渗透到两个项目里。

例如,Flux 也有常见的 just 工厂方法(虽然只有两种变形:接受一个参数或变长参数)。不过 from 方法有很多个变种,最值得一提的是 fromIterable。当然,Flux 也包含了那些常规的操作:map、merge、concat、flatMap、take,等等。

Reactor 把 RxJava 里令人困惑的 amb 操作改成了看起来更加中肯的 firstEmitting。另外,为了保持 API 的一致,toList 被重新命名为 collectList。实际上,所有以 collect 开头的操作都会把值聚合到一个特定类型的集合里,不过只会为每个集合生成一个 Mono。而所有以 to 开头的操作被保留用于类型转换,转换之后的类型可以用于非响应式编程,例如 toFuture()。

在类初始化和资源使用方面,Reactor 之所以也能表现得如此精益,要得益于它的融合特性:Reactor 可以把多个串行的操作(例如调用 concatWith 两次)合并成单个操作,这样就可以只对这个操作的内部类做一次初始化(也就是 macro-fusion)。这个特性包含了基于数据源的优化,抵消了 Mono 在实现 Publisher 时的一些额外开销。它还能在多个相关的操作之间共享资源(也就是 micro-fusion),比如内部队列。这些特性让 Reactor 成为不折不扣的的第四代响应式框架,不过这个超出了这篇文章的讨论范围。

下面让我们来看看几个 Reactor 的操作。

一些操作示例

(这一小节包含了一些代码片段,我们建议你动手去运行它们,深入体验一下 Reactor。所以你需要打开 IDE,并创建一个测试项目,把 Reactor 加入到依赖项里。)

对于 Maven,可以把下面的依赖加到 pom.xml 里:

<dependency>
    <groupId>io.projectreactor</groupId>    
    <artifactId>reactor-core</artifactId>
    <version>3.0.3.RELEASE</version>
</dependency>

对于 Gradle,要把 Reactor 作为依赖项,类似这样:

dependencies {
    compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}

我们来重写前面几篇同系列文章里的例子!

Observable 的创建跟在 RxJava 里有点类似,在 Reactor 里可以使用 just(T...) 和 fromIterator(Iterable) 工厂方法来创建。just 方法会把 List 作为一个整体触发,而 fromIterable 会逐个触发 List 里的每个元素:

public class ReactorSnippets {
  private static List<String> words = Arrays.asList(
        "the",
        "quick",
        "brown",
        "fox",
        "jumped",
        "over",
        "the",
        "lazy",
        "dog"
        );

  @Test
  public void simpleCreation() {
     Flux<String> fewWords = Flux.just("Hello", "World");
     Flux<String> manyWords = Flux.fromIterable(words);

     fewWords.subscribe(System.out::println);
     System.out.println();
     manyWords.subscribe(System.out::println);
  }
}

跟在 RxJava 里一样,上面的代码会打印出:

Hello
World
the
quick
brown
fox
jumped
over
the
lazy
dog

为了打印句子里的每一个字母,我们还需要 flatMap 方法(跟在 RxJava 里一样),不过在 Reactor 里我们使用 fromArray 来代替 from。然后我们会用 distinct 过滤掉重复的字母,并用 sort 对它们进行排序。最后,我们使用 zipWith 和 range 输出每个字母的次序:

@Test
public void findingMissingLetter() {
  Flux<String> manyLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  manyLetters.subscribe(System.out::println);
}

我们可以很容易地看到s被遗漏了:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

我们可以通过纠正单词数组来修复这个问题,不过也可以使用 concat/concatWith 和一个 Mono 来手动往字母 Flux 里添加“s”:

@Test
public void restoringMissingLetter() {
  Mono<String> missing = Mono.just("s");
  Flux<String> allLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .concatWith(missing)
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  allLetters.subscribe(System.out::println);
}

这样,在去重和排序后,遗漏的 s 字母就被添加进来了:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

上一篇文章提到了 Rx 和 Streams API 之间的相似之处,而实际上,在数据就绪的时候,Reactor 也会像 Java Steams 那样开始简单地推送数据事件(可以参看下面关于回压的内容)。只是在主线程里对事件源进行订阅无法完成更加复杂的异步操作,主要是因为在订阅完成之后,控制权会马上返回到主线程,并退出整个程序。例如:

@Test
public void shortCircuit() {
  Flux<String> helloPauseWorld = 
              Mono.just("Hello")
                  .concatWith(Mono.just("world")
                  .delaySubscriptionMillis(500));

  helloPauseWorld.subscribe(System.out::println);
}

这个单元测试会打印出Hello,但无法打印出world,因为程序会过早地退出。在做简单测试的时候,如果你只是像这样写一个简单的主类,你通常会掉入陷阱。作为补救,你可以创建一个 CountDownLatch 对象,并在 Subscriber(包括 onError 和 onComplete)里调用 countDown 方法。不过这样就变得不那么响应式了,不是吗?(万一你忘了调用 countDown 方法,而刚好发生错误了该怎么办?)

解决这个问题的第二种方法是使用一些操作转换到非响应式模式。toItetable 和 toStream 会生成阻塞实例。我们在例子里使用 toStream:

@Test
public void blocks() {
  Flux<String> helloPauseWorld = 
    Mono.just("Hello")
        .concatWith(Mono.just("world")
                        .delaySubscriptionMillis(500));

  helloPauseWorld.toStream()
                 .forEach(System.out::println);
}

正如你所期待的那样,在打印出Hello之后有一个短暂的停顿,然后打印出“world”并退出。我们之前也提过,RxJava 的 amb 操作在 Reactor 里被重命名为 firstEmitting(正如它的名字所表达的:选择第一个 Flux 来触发)。在下面的例子里,我们会创建一个 Mono,这个 Mono 会有 450 毫秒的延迟,还会创建一个 Flux,这个 Flux 以 400 毫秒的间隔触发事件。在使用 firstEmitting() 对它们进行合并时,因为 Flux 的第一个值先于 Mono 的值出现,所以最后 Flux 会被采用:

@Test
public void firstEmitting() {
  Mono<String> a = Mono.just("oops I'm late")
                       .delaySubscriptionMillis(450);
  Flux<String> b = Flux.just("let's get", "the party", "started")
                       .delayMillis(400);

  Flux.firstEmitting(a, b)
      .toIterable()
      .forEach(System.out::println);
}

这个单元测试会打印出句子的所有部分,它们之间有 400 毫秒的时间间隔。

这个时候你可能会想,如果我写的测试使用的是 4000 毫秒的间隔而不是 400 毫秒,那会怎样?你不会想在一个单元测试里等待 4 秒钟的!在后面的部分,我们会看到 Reactor 提供了一些测试工具可以很好地解决这个问题。

我们已经通过例子比较了 Reactor 的一些常用操作,现在我们要回头看看这个框架其它方面的不同点。

基于 Java 8

Reactor 选择 Java 8 作为运行基础而不是之前的任何版本,这再一次与它简化 API 的目标不谋而合:RxJava 选择了 Java 6,而 Java 6 里没有 java.util.function 包,RxJava 也就无法利用这个包下面的 Functino 类和 Consumer 类,所以它必须创建很多类似 Func1、Func2、Action0、Action1 这样的类。RxJava 2 使用类似 Reactor 2 的方式把这些类作为 java.util.function 的镜像,因为它还得支持 Java 7。

Reactor API 还使用了 Java 8 里新引入的一些类型。因为大部分基于时间的操作都跟时间段有关系(例如超时、时间间隔、延迟,等等),所以直接就使用了 Java 8 里的 Duration 类。

Java 8 Stream API 和 CompletableFuture 跟 Flux/Mono 之间可以很容易地进行互相转换。那么一般情况下我们是否要把 Stream 转成 Flux?不一定。虽然说 Flux 或 Mono 对 IO 和内存相关操作的封装所产生的开销微不足道,不过 Stream 本身也并不会带来很大延迟,所以直接使用 Stream API 是没有问题的。对于上述情况,在 RxJava 2 里需要使用 Observable,因为 Observable 不支持回压,所以一旦对其进行订阅,它就成为事件推送的来源。Reactor 是基于 Java 8 的,所以在大部分情况下,Stream API 已经能够满足需求了。要注意的是,尽管 Flux 和 Mono 的工厂模式也支持简单类型,但它们的主要用途还是在于把对象合并到更高层次的流里面。所以一般来说,在现有代码上应用响应式模式时,你不会希望把“long getCount()”这样的方法转成“Mono getCount()”。

关于回压

回压是 RS 规范和 Reactor 主要关注点之一(如果还有其它关注点的话)。回压的原理是说,在一个推送场景里,生产者的生产速度比消费者的消费速度快,消费者会向生产者发出信号说“嘿,慢一点,我处理不过来了。”生产者可以借机控制数据生成的速度,而不是抛弃数据或者冒着产生级联错误的风险继续生成数据。

你也许会想,在 Mono 里为什么也需要回压:什么样的消费者会被一个单独的触发事件压垮?答案是“应该不会有这样的消费者”。不过,在 Mono 和 CompletableFuture 工作原理之间仍然有一个关键的不同点。后者只有推送:如果你持有一个 Future 的引用,那么说明一个异步任务已经在执行了。另一方面,回压的 Flux 或 Mono 会启动延迟的拉取 - 推送迭代:

  1. 延迟是因为在调用 subscribe() 方法之前不会发生任何事情
  2. 拉取是因为在订阅和发出请求时,Subscriber 会向上游发出信号,准备拉取下一个数据块
  3. 接下来生产者向消费者推送数据,这些数据在消费者的请求范围之内

对 Mono 来说,subscribe() 方法就相当于一个按钮,按下它就等于说我准备好接收数据了。Flux 也有一个类似的按钮,不过它是 request(n) 方法,这个方法是 subscribe() 的一般化用法。

Mono 作为一个 Publisher,它往往代表着一个耗费资源的任务(在 IO、延迟等方面),意识到这点是理解回压的关键:如果不对其进行订阅,你就不需要为之付出任何代价。因为 Mono 经常跟具有回压的 Flux 一起被编排到一个响应式链上,来自多个异步数据源的结果有可能被组合到一起,这种按需触发的能力是避免阻塞的关键。

我们可以使用回压来区分 Mono 的不同使用场景,相比上述的例子,Mono 有另外一个常见的使用场景:把 Flux 的数据异步地聚合到 Mono 里。reduce 和 hasElement 可以消费 Flux 里的每一个元素,再把这些数据以某种形式聚合起来(分别是 reduce 函数的调用结果和一个 boolean 值),作为一个 Mono 对外暴露数据。在这种情况下,使用 Long.MAX_VALUE 向上游发出回压信号,上游会以完全推送的方式工作。

关于回压另一个有意思的话题是它如何对存储在内存里的流的对象数量进行限制。作为一个 Publisher,数据源很有可能出现生成数据缓慢的问题,而来自下游的请求超出了可用数据项。在这种情况下,整个流很自然地进入到推送模式,消费者会在有新数据到达时收到通知。当生产高峰来临,或者在生产速度加快的情况下,整个流又回到了拉取模式。在以上两种情况下,最多有 N 项数据(request() 请求的数据量)会被保留在内存里。

你可以对内存的使用情况进行更精确的推算,把 N 项数据跟每项数据需要消耗的内存 W 结合起来:这样你就可以推算出最多需要消耗 W*N 的内存。实际上,Reactor 在大多数情况下会根据 N 来做出优化:根据情况创建内部队列,并应用预取策略,每次自动请求 75% 的数据量。

Reactor 的操作有时候会根据它们所代表的语义和调用者的期望来改变回压信号。例如对于操作 buffer(10):下游请求 N 项数据,而这个操作会向上游请求 10N 的数据量,这样就可以填满缓冲区,为订阅者提供足够的数据。这通常被称为“主动式回压”,开发人员可以充分利用这种特性,例如在微批次场景里,可以显式地告诉 Reactor 该如何从一个输入源切换到一个输出地。

跟 Spring 的关系

Reactor 是 Spring 整个生态系统的基础,特别是 Spring 5(通过 Spring Web Reactive)和 Spring Data “kay”(跟 spring-data-commons 2.0 相对应的)。

这两个项目的响应式版本是非常有用的,我们因此可以开发出完全响应式的 Web 应用:异步地处理请求,一直到数据库,最后异步地返回结果。Spring 应用因此可以更有效地利用资源,避免为每个请求单独分配一个线程,还要等待 I/O 阻塞。

Reactor 将被用于未来 Spring 应用的内部响应式核心组件,以及这些 Spring 组件暴露出来的 API。一般情况下,它们可以处理 RS Publisher,不过大多数时候它们要面对的是 Flux/Mono,需要用到 Reactor 的丰富特性。当然,你也可以自行选择其它响应式框架,Reactor 提供了可以用来适配其它 Reactor 类型和 RxJava 类型甚至简单的 RS 类型的钩子接口。

目前,你可以通过 Spring Boot 2.0.0.BUILD-SNAPSHOT 和 spring-boot-starter-web-reactive 依赖项(可以在 start.spring.io 上生成一个这样的项目)来体验 Spring Web Reactive:

<dependency>
  <groupId>org.springframework.boot.experimental</groupId>
  <artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>

你可以像往常那样写你的 @Controller,只不过把 Spring MVC 的底层变成响应式的,把大部分 Spring MVC 的契约换成了响应式非阻塞的契约。响应式层默认运行在 Tomcat 8.5 上,你也可以选择使用 Undertow 或 Netty。

{% asset_img 1.jpg %}

另外,虽然 Spring API 是以 Reactor 类型为基础的,不过在 Spring Web Reactive 模块里可以为请求和响应使用各种各样的响应式类型:

  • Mono:作为 @RequestBody,请求实体 T 会被异步反序列化,之后的处理可以跟 Mono 关联起来。作为返回类型,每次 Mono 发出了一个值,T 就会被异步序列化并发回客户端。你可以把请求 Mono 作为参数,并把参数化了的关联处理作为结果 Mono 返回。
  • Flux:在流场景里使用(作为 @RequestBody 使用的输入流以及包含了 Flux 返回类型的 Server Sent Events)。
  • Single/Observable:分别对应 Mono 和 Flux,不过会切换回 RxJava。
  • Mono 作为返回类型:在 Mono 结束时请求的处理也跟着完成。
  • 非响应式返回类型(void 和 T):这个时候你的 @Controller 方法是同步的,不过它应该是非阻塞的(短暂的处理)。请求处理在方法执行完毕时结束,返回的 T 被异步地序列化并发回客户端。

下面是使用 Spring Web Reactive 的例子:

@Controller
public class ExampleController {

  private final MyReactiveLibrary reactiveLibrary;

  public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) {
     this.reactiveLibrary = reactiveLibrary;
  }

  @RequestMapping("hello/{who}")
  @ResponseBody
  public Mono<String> hello(@PathVariable String who) {
       return Mono.just(who)
             .map(w -> "Hello " + w + "!");
  }

  @RequestMapping(value = "heyMister", method = RequestMethod.POST)
  @ResponseBody
  public Flux<String> hey(@RequestBody Mono<Sir> body) {
      return Mono.just("Hey mister ")
            .concatWith(body
                  .flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
                  .map(String::toUpperCase)
                  .take(1)
            ).concatWith(Mono.just(". how are you?"));
  }
}

第一个端点含有一个路径变量,它被转成 Mono,并被映射到一个问候语里返回给客户端。

一个发到 /hello/SImon 的 GET 请求会得到“Hello Simon!”的文本响应。

第二个端点相对复杂一些:它异步地接收序列化 Sir 对象(一个包含了 firstName 和 lastName 属性的类)并使用 flatMap 方法把它映射到一个字母流里,这个字母流包含了 lastName 的所有字母。然后它选取流里的第一个字母,把它转成大写,并跟问候语串在一起。

所以向 /heyMister POST 一个 JSON 对象

{
    "firstName": "Paul",
    "lastName": "tEsT"
}

会返回字符串“Hello mister T. How are you?”。

响应式 Spring Data 目前也在开发当中,它被作为 Kay 发布的一部分,代码在 spring-data-commons 2.0.x 分支上。现在已经有一个里程碑版本可以使用:

<dependencyManagement>
  <dependencies>
     <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-releasetrain</artifactId>
        <version>Kay-M1</version>
        <scope>import</scope>
        <type>pom</type>
     </dependency>
  </dependencies>
</dependencyManagement>

然后简单地添加 Spring Data Commons 的依赖(它会自动从上面的 BOM 里获取版本号):

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-commons</artifactId>
</dependency>

Spring Data 对响应式的支持主要表现在新的 ReactiveCrudRepository 接口,它扩展了 Repository。这个接口暴露了 CRUD 方法,使用的是 Reactor 类型的输入和返回值。还有一个 RxJava 1 的版本,叫作 RxJava1CrudRepository。要在 CrudRepository 里通过 id 获取一个实体,可以调用“T findOne(ID id)”方法,而在 ReactiveCrudRepository 和 RxJava1CrudRepository 里要分别调用“Mono findOne(ID id)”和“Observable findOne(ID id)”。还有其它的变种,它们接收 Mono/Single 作为参数,异步地提供 key,并在此基础上组合返回结果。

假设有一个响应式的后端存储(或者 mock 的 ReactiveCrudRepository bean),下面的 controller 将从前到后都是响应式的:

@Controller
public class DataExampleController {

  private final ReactiveCrudRepository<Sir, String> reactiveRepository;

  public DataExampleController(
                 @Autowired ReactiveCrudRepository<Sir, String> repo) {
     this.reactiveRepository = repo;
  }

  @RequestMapping("data/{who}")
  @ResponseBody
  public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
     return reactiveRepository.findOne(who)
                              .map(ResponseEntity::ok)
                              .defaultIfEmpty(ResponseEntity.status(404)
                                                            .body(null));
  }
}

请注意整个流程:我们异步地获取实体并用 map 把它包装成 ResponseEntity,取得一个可以马上返回的 Mono。如果 Spring Data repository 找不到这个 key 的数据,会返回一个空的 Mono。我们使用 defaultIfEmpty 显式地返回 404。

测试 Reactor

测试 RxJava这篇文章里提到了如何测试 Observable。正如我们所看到的,RxJava 提供了 TestScheduler,我们可以把它跟 RxJava 的操作一起使用,这些操作接受一个 Scheduler 参数,TestScheduler 会为这些操作启动虚拟的时钟。RxJava 还提供了一个 TestSubscriber 类,可以用它等待 Observable 执行完毕,也可以用它对每个事件进行断言(onNext 的值和它的数量、触发的 onError,等等)。在 RxJava 2 里,TestSubscriber 就是 RS Subscriber,你可以用它测试 Reactor 的 Flux 和 Mono!

在 Reactor 里,上述两个使用广泛的特性被组合到了 StepVerifier 类里。从 reactor-addons 仓库的 reactor-test 模块里可以获取到 StepVerifier。在创建 Publisher 实例时,调用 StepVerifier.create 方法可以初始化一个 StepVerifier。如果要使用虚拟时钟,可以调用 StepVerifier.withVirtualTime 方法,这个方法接受一个 Supplier 作为参数。之所以这样设计,是因为它会首先保证创建一个 VirtualTimeScheduler 对象,并把它作为默认的 Scheduler 传给旧有的操作。StepVerifier 会对在 Supplier 里创建的 Flux/Mono 进行配置,把基于时间的操作转为“虚拟时间操作”。接下来你就可以编写各种你所期望的用例:下一个元素应该是什么,是否应该出现错误,是否应该及时向前移动,等等。借助其它方法,比如事件与 Predicate 的匹配或者对 onNext 事件的消费,你可以与那些值之间做一些更高级的交互(犹如在使用断言框架)。任何地方抛出的 AssertionError 都会在最终的结果里反应出来。最后,调用 verify() 对你的用例进行测试,这个方法会通过 StepVerifier.create 或 StepVerifier.withVirtualTime 方法对预定义的事件源进行订阅。

让我们来举一些简单的例子来说明 StepVerifier 时如何工作的。首先要添加依赖到 POM 里:

<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.0.3.RELEASE</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.assertj</groupId>
  <artifactId>assertj-core</artifactId>
  <version>3.5.2</version>
  <scope>test</scope>
</dependency>

假设你有一个叫作 MyReactiveLibrary 的类,你要对这个类生成的一些 Flux 进行测试:

@Component
public class MyReactiveLibrary {

  public Flux<String> alphabet5(char from) {
     return Flux.range((int) from, 5)
           .map(i -> "" + (char) i.intValue());
  }

  public Mono<String> withDelay(String value, int delaySeconds) {
     return Mono.just(value)
                .delaySubscription(Duration.ofSeconds(delaySeconds));
  }
}

第一个方法将返回给定字母之后的 5 个字母。第二个方法返回一个 Flux,它会以给定的时间间隔触发给定值,其中的时间间隔以秒为单位。第一个测试是要保证使用 x 调用 alphabet5 的输出被限定在 x、y、z。使用 StepVerifier 看起来是这样的:

@Test
public void testAlphabet5LimitsToZ() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
        .expectNext("x", "y", "z")
        .expectComplete()
        .verify();
}

第二个测试要保证 alphabet5 返回的每个值都是字母。在这里我们使用断言框架 AssertJ :

@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
              .consumeNextWith(c -> assertThat(c)
                    .as("first is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("second is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("third is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("fourth is alphabetic").matches("[a-z]"))
              .expectComplete()
              .verify();
}

结果这些测试都运行失败。让我们检查一下 StepVirifier 的输出,看看能不能找出 bug:

java.lang.AssertionError: expected: onComplete(); actual: onNext({)

java.lang.AssertionError: [fourth is alphabetic] 
Expecting:
 "{"
to match pattern:
 "[a-z]"

看起来我们的方法并没有在 z 的时候停住,而是继续发出 ASCII 字符。我们可以加入.take(Math.min(5,'z'-from+1)) 来修复这个 bug,或者把 Math.min 作为 range 的第二个参数。

我们要做的最后一个测试需要用到虚拟时钟:我们使用 withVirtualTime 构造器来测试方法的延迟,而不需要真的等待指定的时间:

@Test
public void testWithDelay() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  Duration testDuration =
     StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
                 .expectSubscription()
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNoEvent(Duration.ofSeconds(10))
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNext("foo")
                 .expectComplete()
                 .verify();
  System.out.println(testDuration.toMillis() + "ms");
}

这个测试用例测试一个将被延迟 30 秒的 Flux:在订阅之后的 30 秒内不会发生任何事情,然后发生一个 onNext("foo") 事件后结束。

System.out 会打印出验证所需要的时间,在最近的一次测试中它用掉了 8 毫秒。

如果调用构造器的 create 方法,thenAwait 和 expectNoEvent 方法仍然可以使用,不过它们会阻塞指定的时间。

自定义动态源

RxJava 实例解析一文中提到的动态和静态 Observable 对 Reactor 来说也是适用的。

如果你要创建一个自定义的 Flux,需要使用 Reactor 的 FluxSink。这个类将会为你考虑所有跟异步有关的情况,你只需要把注意力集中在触发事件上。

使用 Flux.create 并从回调中获得的 FluxSink 可以用于后续的触发事件。这个自定义的 Flux 是静态的,为了把它变成动态的,可以使用 publish() 和 connect() 方法。基于上一篇文章中的例子,我们几乎可以把它逐字逐句地翻译成 Reactor 的版本:

SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
     Flux.create(emitter ->
     {
        SomeListener listener = new SomeListener() {
           @Override
           public void priceTick(PriceTick event) {
              emitter.next(event);
              if (event.isLast()) {
                 emitter.complete();
              }
           }

           @Override
           public void error(Throwable e) {
              emitter.error(e);
           }};
        feed.register(listener);
     }, FluxSink.OverflowStrategy.BUFFER);

ConnectableFlux<PriceTick> hot = flux.publish();

在连接到动态 Flux 之前,可以做两次订阅:一个订阅将打印每个 tick 的细节,另一个订阅会打印出 instrument:

hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
     .getDate(), priceTick.getInstrument(), priceTick.getPrice()));

hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

接下来我们连接到动态 Flux,并在程序结束前让它运行 5 秒钟:

hot.connect();
Thread.sleep(5000);

(要注意,如果 PriceTick 的 isLast() 方法改变了,那么 feed 本身也会结束)。

FluxSink 通过 isCancelled() 来检查下游的订阅是否已取消。你还可以通过 requestedFromDownstream() 来获得请求数,这个在遵循回压策略时很管用。最后,你可以通过 setCancellation 方法释放所有使用过的资源。

要注意,FluxSink 使用了回压,所以你必须提供一个 OverflowStrategy 来显式地处理回压。这个等价于使用 onBackpressureXXX 操作(例如,FluxSink.OverflowStrategy.BUFFER 等价于.onBackpressureBuffer()),它们会覆盖来自下游的回压信号。

结论

在这篇文章里,我们学习了 Reactor,一个运行在 Java 8 之上并以 Rx 规范和 Reactive Streams 规范为基础的第四代响应式框架。我们展示了 RxJava 中的设计理念是如何被应用在 Reactor 上的,尽管它们之间有一些 API 设计上的差别。我们还展示了 Reactor 如何成为 Spring 5 的基础,还提供了一些跟测试 Publisher、Flux 和 Mono 有关的资源。

欢迎访问我的个人博客

关注公众号:JAVA九点半课堂,这里有一批优秀的程序猿,加入我们,一起探讨技术,共同进步!回复“资料”获取 2T 行业最新资料!