阅读 67

自语之Reactor中Flux&Mono的粗略使用

最近,需要快速使用Reactor的的两个类Flux和Mono中的的方法进行开发。在搜索半天之后,发现大部分都是一些转载的文章,笔者点了好几个不同的网站,看到的却是同一篇文章。

在此,笔者不讲过多的原理,以实践为主。

比如,讲一些笔者使用过的Flux&Mono中方法,以及这些方法的使用场景…..。

Flux&Mono的生产:
  • Flux是Reactor中的多元流,一次可以产生(发射)多个元素(对象)。类似于Java中的集合类(List)。
  • Mono是Reactor单个元素,一次只能产生0或者一个元素(对象)。类似于Java中的pojo普通对象。

接下来看看, 如何创建Flux&和Mono:

  • 关于Flux的创建方式:just + generate + empty
Flux<Map> flux1 = Flux.just(Maps.newHashMap()); // 集合创建
Flux<Integer> flux2 = Flux.just(1, 2, 3, 4); // 包装类创建
Flux<String> flux3 = Flux.just("1", "2", "3"); // 字符串创建
Flux flux4 = Flux.empty(); // 返回一个空数据
// 使用generate方法创建
Flux.generate(create -> {
  create.next("1"); // 每次只能接受一个值
  create.next("2");
  create.complete(); // 创建完成
});
复制代码
  • 关于Mono的创建:just + justOrEmpty + empty
Mono<Map> mono1 = Mono.just(Maps.newHashMap()); // 集合创建
Mono<Integer> mono2 = Mono.just(1); // 包装类创建
Mono<String> mono3 = Mono.just("1"); // 字符串创建
Mono.justOrEmpty(null); // 参数一个null或者非空的数据
Mono.justOrEmpty(1);
Mono.empty(); // 返回空数据
复制代码
Flux&Mono的消费:
  • 不管是Flux还是Mono,创建之后需要触发消费逻辑,才能对产生的元素进行处理。
  • 所以, 这里需要调用subscribe方法,才可以对产生的元素进行消费(处理)。
Flux<Integer> flux2 = Flux.just(1, 2, 3, 4); // 包装类创建
flux2.subscribe(System.out::println);
// 打印结果:1 2 3 4

Mono<Integer> Mono1 = Mono.just(1); // 包装类创建
Mono1.subscribe(System.out::println);
// 打印结果: 1
复制代码
  • 通过查看消费方法(subscribe)的的方法的参数列表,可以看出还存在多种的消费方式:
    • Consumer:消费逻辑, 可以传入系统处理函数, 可以传递自定义处理的Lambda表达式
    • errorConsumer:异常处理逻辑, 当产生异常的时候,需要传入的处理逻辑。
    • completeConsumer:完成处理逻辑,也叫完成信号。
  • 思维类比:整个Flux&Mono生产和消费过程, 有点类似Java中的try-catch-finally
    • try -----Flux&Mono中的组合操作(filter、filterMap、zipWith…)
    • catch -----errorConsumer异常处理逻辑。
    • finally ----completeConsumer, 完成信号。
消费函数subscribe的使用:
  • 下面是reactor中subscribe方法的两个重载。(当然还有些重载的subscribe方法,笔者也没怎么用过,所以在此就不罗列出来了)
// 消费一
public final Disposable subscribe(Consumer<? super T> consumer);

// 消费二:
public final Disposable subscribe(
  @Nullable Consumer<? super T> consumer,
  @Nullable Consumer<? super Throwable> errorConsumer,
  @Nullable Runnable completeConsumer);
  
复制代码
  • 对于消费一的consumer, 我们可以传入一个函数(System.out::println),也可以传入一个Lamdba表达式。

    • 消费一(上文)的consumer,传入item -> {}进行消费, 效果如下:
    Flux.range(1, 10) // 产生 [1, 10]的整数
      // 过滤
      .map(item -> {
        // 偶数翻倍
        if(item % 2 == 0) {
          return item * 2;
        }
        return item;
      })
      // 消费
      .subscribe(item -> {
        System.out.print(item + " ");
      });
    //打印结果:1 4 3 8 5 12 7 16 9 20 
    复制代码
    • 消费二多出errorConsumer, completeConsumer两个参数, 都可以传入landba表达式进行使用:
      • info -> log.error("这是一个异常, 信息为:", info):一般使用该方式进行记录异常
      • () -> { doSomething….}:作为完成信号。
    Flux.range(1, 10)
      // 过滤
      .map(item -> {
        // 偶数翻倍
        if(item % 2 == 0) {
          return item * 2;
        }
        return item;
      })
      // 消费
      .subscribe(item -> {
        System.out.print(item + " ");
      }, info -> {
        System.out.print("异常处理");
      }, () -> {
        System.out.println("完成处理");
      });
    // 打印结果:
    // 1 4 3 8 5 12 7 16 9 20 完成处理
    复制代码
    • 在上面的代码中,如果发生异常怎么办?在Flux&Mono中,异常处理&完成处理都属于终止信号, 所以程序一旦出现异常,那么就执行异常处理, 并不会执行完成处理。如果未出现异常, 则只有执行完成处理。
    Flux.range(1, 10)
      // 过滤
      .map(item -> {
        // 偶数翻倍
        if(item == 7) { // 到7进行异常抛出
          throw new RuntimeException();
        }
        return item;
      })
      // 消费
      .subscribe(item -> {
        System.out.print(item + " ");
      }, info -> {
        System.out.print("异常");
      }, () -> {
        System.out.println("完成");
      });
    // 1 2 3 4 5 6 异常
    复制代码
组合处理:map&flatMap
  • 两者都可以对Flux&Mono中的元素进行映射。

  • 区别:

    • map:传入的表达式,可以返回一个普通的pojo对象。
    • flatMap:传入的表达式,只能返回一个Mono&Flux对象。
    String[] arr = {"张三", "22"};
    List<String> arr2 = Arrays.asList(arr);
    Flux.just(arr2)
      .flatMap(obj -> {
        P person = new P();
        // 返回mono对象
        return Mono.just(person);
      }).map(obj -> {
      P person = new P();
      // 返回普通对象
      return person;
    }).subscribe();
    复制代码
背压的概念:

www.zhihu.com/question/49… (第一个回答)

流水线的概念:
  • 在Flux&Mono中, 组合处理如果存在返回值, 那么该返回值是往下游(下一个组合处理)传递。

    Flux.range(1, 10)
      .flatMap(A -> {
        return B;
      }).map(B -> {
      return C;
    }).subscribe(C -> {
      dosomething...
    });
    // 上游flatMap 返回B
    // map接受的参数, 为B, 并返回C
    // subscribe接受的参数为C
    // 从上往下, 可以看做是一个流水线, flatMap&map可以看做是流水线上的工人,subscribe则是生产出来的产品。
    复制代码
关注下面的标签,发现更多相似文章
评论