聊聊Spring Reactor反应式编程

19,901

前言

为了应对 高并发环境下 的服务端编程,微软提出了一个实现 异步编程 的方案 - Reactive Programming,中文名称 反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引入了类似的异步编程方式。Java 社区也没有落后很多,NetflixTypeSafe 公司提供了 RxJavaAkka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架。

正文

函数式编程

函数式编程是种编程方式,它将计算机的运算视为函数的计算。函数编程语言最重要的基础是 λ演算 (lambda calculus),而λ演算的函数可以接受函数当作 输入(参数)输出(返回值)lambda 表达式对与大多数程序员已经很熟悉了,jdk8 以及 es6都是引入的 lambda

函数式编程的特点

  • 惰性计算
  • 函数是“第一等公民”
  • 只使用表达式而不使用语句
  • 没有副作用

反应式编程

反应式编程 (reactive programming) 是一种基于 数据流 (data stream)变化传递 (propagation of change)声明式 (declarative) 的编程范式。

反应式编程的特点

1. 事件驱动

在一个 事件驱动 的应用程序中,组件之间的交互是通过松耦合的 生产者 (production)消费者 (consumption) 来实现的。这些事件是以 异步非阻塞 的方式发送和接收的。

事件驱动 的系统依靠 推模式 而不是 拉模式投票表决,即 生产者 是在有消息时才推送数据给 消费者,而不是通过一种浪费资源方式:让 消费者 不断地 轮询等待数据

2. 实时响应

程序发起执行以后,应该 快速 返回存储 结果的上下文,把具体执行交给 后台线程。待处理完成以后,异步地将 真实返回值 封装在此 上下文 中,而不是 阻塞 程序的执行。实时响应是通过 异步 编程实现的,例如:发起调用后,快速返回类似 java8CompletableFuture 对象。

3. 弹性机制

事件驱动的 松散耦合 提供了组件在失败下,可以抓获 完全隔离 的上下文场景,作为 消息封装,发送到下游组件。在具体编程时可以 检查错误 ,比如:是否接收到,接收的命令是否可执行等,并决定如何应对。

Reactor简介

Reactor 框架是 Pivotal 基于 Reactive Programming 思想实现的。它符合 Reactive Streams 规范 (Reactive Streams 是由 NetflixTypeSafePivotal 等公司发起的) 的一项技术。其名字有 反应堆 之意,反映了其背后的强大的 性能

1. Reactive Programming

Reactive Programming,中文称 反应式编程Reactive Programming 是一种 非阻塞事件驱动数据流 的开发方案,使用 函数式编程 的概念来操作数据流,系统中某部分的数据变动后会自动更新其他部分,而且成本极低。

其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。Hystrix 就是以 RxJava 为基础开发的。

反应式编程其实并不神秘,通过与我们熟悉的 迭代器模式 对比,便可了解其基本思想:

事件 Iterable (pull) Observable (push)
获取数据 T next() onNext(T)
发现异常 throws Exception onError(Exception)
处理完成 hasNext() onCompleted()

上面表格的中的 Observable 那一列便代表 反应式编程API 的使用方式。它其实是 观察者模式 的一种延伸。

如果将 迭代器模式 看作是 拉模式,那 观察者模式 便是 推模式

  1. 被订阅者 (Publisher) 主动推送数据给 订阅者 (Subscriber),触发 onNext() 方法。异常和完成时触发另外两个方法。

  2. 被订阅者 (Publisher) 发生异常,则触发 订阅者 (Subscriber)onError() 方法进行异常捕获处理。

  3. 被订阅者 (Publisher) 每次推送都会触发一次 onNext() 方法。所有的推送完成且无异常时,onCompleted() 方法将 在最后 触发一次。

如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办?这就是 Backpressure 的由来。Reactive Programming 框架需要提供 背压机制,使得 Subscriber 能够控制 消费消息 的速度。

2. Reactive Streams

Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 ScalaAkka)、Pivatol(开发了 SpringReactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。

Reactive Streams 由以下几个组件组成:

  • 发布者:发布元素到订阅者
  • 订阅者:消费元素
  • 订阅:在发布者中,订阅被创建时,将与订阅者共享
  • 处理器:发布者与订阅者之间处理数据

其主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 这三个方法。对于 Reactive Streams,只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

3. Reactor的主要模块

Reactor 框架主要有两个主要的模块:

  • reactor-core
  • reactor-ipc

前者主要负责 Reactive Programming 相关的 核心 API 的实现,后者负责 高性能网络通信 的实现,目前是基于 Netty 实现的。

4. Reactor的核心类

Reactor 中,经常使用的类并不是很多,主要有以下两个:

  • Mono

Mono 实现了 org.reactivestreams.Publisher 接口,代表 01 个元素的 发布者

  • Flux

Flux 同样实现了 org.reactivestreams.Publisher 接口,代表 0N 个元素的发表者。

  • Scheduler

代表背后驱动反应式流的调度器,通常由各种线程池实现。

5. WebFlux

Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架 - Spring WebFlux ,但是使用方式并没有同传统的基于 ServletSpring MVC 有什么大的不同。

WebFluxMVC 接口的示例:

@RequestMapping("/webflux")
@RestController
public class WebFluxTestController {
    @GetMapping("/mono")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}

最大的变化就是返回值从 Foobar 所表示的一个对象变为 Mono<Foobar>Flux<Foobar>

6. Reactive Streams、Reactor和WebFlux

上面介绍了 反应式编程 的一些概念。可能读者看到这里有些乱,梳理一下三者的关系:

  1. Reactive Streams 是一套反应式编程 标准规范
  2. Reactor 是基于 Reactive Streams 一套 反应式编程框架
  3. WebFluxReactor 为基础,实现 Web 领域的 反应式编程框架

其实,对于业务开发人员来说,当编写反应式代码时,通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 MonoFlux

对于 SubscriberSubcription 这两个接口,Reactor 也有相应的实现。这些都是 Spring WebFluxSpring Data Reactive 这样的框架用到的。如果 不开发中间件,开发人员是不会接触到的。

Reactor入门

接下来介绍一下 ReactorMonoFlux 这两个类中的主要方法的使用。

如同 Java 8 所引入的 Stream 一样,Reactor 的使用方式基本上也是分三步:

  • 开始阶段的创建
  • 中间阶段的处理
  • 最终阶段的消费

只不过创建和消费可能是通过像 Spring 5 这样框架完成的(比如通过 WebFlux 中的 WebClient 调用 HTTP 接口,返回值便是一个 Mono)。但我们还是需要基本了解这些阶段的开发方式。

1. 创建 Mono 和 Flux(开始阶段)

使用 Reactor 编程的开始必然是先创建出 MonoFlux。有些时候不需要我们自己创建,而是实现例如 WebFlux 中的 WebClientSpring Data Reactive 得到一个 MonoFlux

  • 使用 WebFlux WebClient 调用 HTTP 接口
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}
  • 使用 ReactiveMongoRepository 查询 User
public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}

但有些时候,我们也需要主动地创建一个 MonoFlux

普通的创建方式

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);

这样的创建方式在什么时候用呢?一般是用在经过一系列 非IO型 操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行 高性能IO 操作时,可以用这种方式将之前得到的对象转换为 MonoFlux

文艺的创建方式

上面是通过一个 同步调用 得到的结果创建出 MonoFlux,但有时需要从一个 Reactive异步调用 的结果创建出 MonoFlux

如果这个 异步方法 返回一个 CompletableFuture,那可以基于这个 CompletableFuture 创建一个 Mono

Mono.fromFuture(completableFuture);

如果这个 异步调用 不会返回 CompletableFuture,是有自己的 回调方法,那怎么创建 Mono 呢?可以使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }

        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }
    });
});

在使用 WebFlux 之后,AsyncRestTemplate 已经不推荐使用,这里只是做演示。

2. 处理 Mono 和 Flux(中间阶段)

中间阶段的 MonoFlux 的方法主要有 filtermapflatMapthenzipreduce 等。这些方法使用方法和 Stream 中的方法类似。

下面举几个 Reactor 开发实际项目的问题,帮大家理解这些方法的使用场景:

问题一: map、flatMap 和 then 在什么时候使用

本段内容将涉及到如下类和方法:

  • 方法Mono.map()
  • 方法Mono.flatMap()
  • 方法Mono.then()
  • Function

MonoFlux 中间环节的处理过程中,有三个有些类似的方法:map()flatMap()then()。这三个方法的使用频率很高。

  • 传统的命令式编程
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
  • 对应的反应式编程
Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));

从上面两段代码的对比就可以看出来 flatMap() 方法在其中起到的作用,map()then() 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono other)
then()

then() 看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。then() 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMap()map() 的参数都是一个 Function,入参是上一步的执行结果。

flatMap() 和 map()

flatMap()map() 的区别在于,flatMap() 中的入参 Function 的返回值要求是一个 Mono 对象,而 map 的入参 Function 只要求返回一个 普通对象。在业务处理中常需要调用 WebClientReactiveXxxRepository 中的方法,这些方法的 返回值 都是 Mono(或 Flux)。所以要将这些调用串联为一个整体 链式调用,就必须使用 flatMap(),而不是 map()

问题二:如何实现并发执行

本段内容将涉及到如下类和方法:

  • 方法Mono.zip()
  • Tuple2
  • BiFunction

并发执行 是常见的一个需求。Reactive Programming 虽然是一种 异步编程 方式,但是 异步 不代表就是 并发并行 的。

传统的命令式编程 中,并发执行 是通过 线程池Future 的方式实现的。

Future<Result1> result1Future = threadPoolExecutor.submit(() -> doStep1(params));
Future<Result2> result2Future = threadPoolExecutor.submit(() -> doStep2(params));
// Retrive result
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;

上面的代码虽然实现了 异步调用,但 Future.get() 方法是 阻塞 的。在使用 Reactor 开发有 并发 执行场景的 反应式代码 时,不能用上面的方式。

这时应该使用 MonoFlux 中的 zip() 方法,以 Mono 为例,代码如下:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);

上述代码中,产生 item1Monoitem2Mono 的过程是 并行 的。比如,调用一个 HTTP 接口的同时,执行一个 数据库查询 操作。这样就可以加快程序的执行。

但上述代码存在一个问题,就是 zip() 方法需要做 强制类型转换。而强制类型转换是 不安全的。好在 zip() 方法存在 多种重载 形式。除了最基本的形式以外,还有多种 类型安全 的形式:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);

对于不超过 7 个元素的合并操作,都有 类型安全zip() 方法可选。以两个元素的合并为例,介绍一下使用方法:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});

上述代码中,map() 方法的参数是一个 Tuple2,表示一个 二元数组,相应的还有 Tuple3Tuple4 等。

对于两个元素的并发执行,也可以通过 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接将结果合并。方法是传递 BiFunction 实现 合并算法

问题三:集合循环之后的汇聚

本段内容将涉及到如下类和方法:

  • 方法Flux.fromIterable()
  • 方法Flux.reduce()
  • BiFunction

另外一个稍微复杂的场景,对一个对象中的一个类型为集合类的(ListSet)进行处理之后,再对原本的对象进行处理。使用 迭代器模式 的代码很容易编写:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data

当我们要用 Reactive 风格的代码实现上述逻辑时,就不是那么简单了。这里会用到 Fluxreduce() 方法。reduce() 方法的签名如下:

  • <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

可以看出,reduce() 方法的功能是将一个 Flux 聚合 成一个 Mono

  • 第一个参数: 返回值 Mono 中元素的 初始值

  • 第二个参数: 是一个 BiFunction,用来实现 聚合操作 的逻辑。对于泛型参数 <A, ? super T, A> 中:

    • 第一个 A: 表示每次 聚合操作 之后的 结果的类型,它作为 BiFunction.apply() 方法的 第一个入参
    • 第二个 ? super T: 表示集合中的每个元素的类型,它作为 BiFunction.apply() 方法的 第二个入参
    • 第三个 A: 表示聚合操作的 结果,它作为 BiFunction.apply() 方法的 返回值

接下来看一下示例:

Data initData = ...;
List<SubData> list = ...;
Flux.fromIterable(list)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });

上面的示例代码中,initDatadata 的类型相同。执行完上述代码之后,reduce() 方法会返回 Mono<Data>

3. 消费 Mono 和 Flux(结束阶段)

直接消费的 MonoFlux 的方式就是调用 subscribe() 方法。如果在 WebFlux 接口中开发,直接返回 Mono 或 Flux 即可。WebFlux 框架会完成最后的 Response 输出工作。

小结

本文介绍了反应式编程的一些概念和 Spring Reactor 框架的基本用法,还介绍了如何用 Reactor 解决一些稍微复杂一点的问题。ReactorSpring 5 中有大量的应用,后面会给大家分享一些 Spring Reactor 实战系列的博客。


欢迎关注技术公众号: 零壹技术栈

零壹技术栈

本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。