Flutter | 状态管理拓展篇——RxDart(四)

25,319 阅读7分钟

前言

在前一篇文章向大家介绍了一种新的状态管理方式——BLoC,它在分离我们的ui逻辑与业务逻辑上表现十分优秀。但是在最后我们发现了一个问题。

bloc是一个典型的观察者模式,我们以counter bloc举例,在A,B页面都存在观察者,它们监听的是同一个广播流,当我们pop B页面,回到A页面这个操作不会出现任何问题,而当我们再次进入B页面的时候却发现,它显示了初始值0,而不是我们想要的value,只有等我们再次按下按钮时,它才能刷新获得实际的value。

Stream很棒,但是还不够强大

所以今天要给大家简单介绍下ReactiveX的dart 实现——RxDart,它极大的扩展了Stream的功能,能够让我们在使用bloc的时候更加游刃有余。

在正式开始介绍前,我希望您已经阅读并理解了stream的相关知识,后面的内容都基于此。如果您还未了解过dart:stream 的话,我建议您先阅读这篇文章:Dart:什么是Stream

RxDart

ReactiveX是什么

ReactiveX是一个强大的库,用于通过使用可观察序列来编写异步基于事件的程序。它突破了语言平台的限制,让我们编写异步程序就像在自家花园散步那样 easy。我相信你一定会爱上它!

基本概念

Dart:什么是Stream这篇文章中,我用到一个模型来理解stream里面到底发生了什么。今天我们还是利用这个模型来看看,在rxdart中它是什么样的。

这个模式的关键思维在于观察者的无状态。我们平时调用方法的时候一定是很清楚我们什么时候调用,并立刻会返回一个预想的结果。

但是在这里,我们中间进行处理的时候,完全是处于异步状态的,也就是说无法立刻返回一个值。我们不知道stream什么时候会“吐”出处理结果,所以必须要一个观察者来守着这个出口。

当有事件/数据流出时,观察者捕捉到了这个事件并解析处理。

  • Subject实现并扩展了StreamController,它符合StreamController的所有规范。假如您之前使用的StreamController,那么你可以直接替换为Subject。你可以把它想像成streamController。
  • Observable实现并扩展了Stream。它将常用的stream和streamTransformer组合成了非常好用的api。你可以把它想像成stream。

可观察对象——Observable

创建Observavle

你可以把stream直接包装成Observable

  var obs = Observable(Stream.fromIterable([1,2,3,4,5]));
  
  obs.listen(print);

输出:1 2 3 4 5

通过Future创建:fromFuture

 var obs = Observable.fromFuture(new Future.value("Hello"));
 
  obs.listen(print); 

输出:Hello

通过Iterable创建:fromIterable

var obs = Observable.fromInterable([1,2,3,4,5]);

obs.listen(print);

输出:1 2 3 4 5

让流的“吐”出间隔一段时间:interval

interval方法能够让流“吐出数据”后间隔一段时间再吐下一个数据。

  var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .interval(new Duration(seconds: 1));

  obs.listen(print);

输出:1 ... 2 ... 3 ... 4 ... 5

其中...代表停顿了一秒。

迭代地处理数据:map

map方法能够让我们迭代的处理每一个数据并返回一个新的数据

 var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
    .map((item)=>++item);
    
obs.listen(print);

输出:2 3 4 5 6

扩展流:expand

expand方法能够让我们把把每个item扩展至多个流

 var obs = Observable(Stream.fromIterable([1,2,3,4,5]))
   .expand((item)=> [item,item.toDouble()]);

 obs.listen(print);

输出:1 1.0 2 2.0 3 3.0 4 4.0 5 5.0

这里我们将每个数据扩展成【item,item.toDouble】你可以扩展成任意组的流。假如这是一个广播Observable,并被多次收听,那么他可以单独调用expand并扩展自己。

合并流:merge

merge方法能够让我们合并多个流,请注意输出。

  var obs = Observable.merge([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);

输出:1 4 7 2 5 8 3 6 9

顺序执行多个流:concat

concat方法能够让我们按照顺序执行一组流,当一组流执行完毕后,再开始执行下一组。

  var obs = Observable.concat([
    Stream.fromIterable([1,2,3]),
    Stream.fromIterable([4,5,6]),
    Stream.fromIterable([7,8,9]),
  ]);

  obs.listen(print);

输出:1 2 3 4 5 6 7 8 9

检查每一个item:every

every会检查每个item是否符合要求,然后它将会返回一个能够被转化为 Observable 的 AsObservableFuture< bool>。

  var obs = Observable.fromIterable([1,2,3,4,5]);

  obs.every((x)=> x < 10).asObservable().listen(print);

输出结果:true

关于Observable你还需要知道这些

  • Dart中 Observables 默认是单一订阅。如果您尝试两次收听它,则会抛出 StateError 。你可以使用工厂方法或者 asBroadcastStream 将其转化为多订阅流。
  var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();
  • 很多方法的返回值并不是一个 Single 也不是一个 Observable 而是必须返回一个Dart的 Future。幸运的是你很容易找到一些方法,把他们转化成回 stream
  • 出现错误时,Dart中的Stream不会默认关闭。但是在Rxdart中,Error会导致Observable终止,除非它被运算符拦截。
  • 默认情况下Dart中Stream是异步的,而Observables默认是同步的。
  • 在处理多订阅Observable的时候,onListen方法只有在第一次会被调用。且各个订阅者之间不会互相干涉。
  var obs = Observable(Stream.fromIterable([1,2,3,4,5])).asBroadcastStream();

//第一个订阅者
  obs.interval(Duration(seconds: 1)).map((item) => ++item).listen(print);
//第二个订阅者
  obs.listen(print);

输出:1 2 3 4 5 2 3 4 5 6

以上是一些比较常见的Observable的使用方法,它并不完整,我将会在以后持续的更新这篇文章,并完整介绍它的功能

增强版StreamController——Subject

普通广播流控制器:PublishSubject

PublishSubject就是一个普通广播版StreamController,你可以多次收听,默认是sync是false,也就是说里面是一个AsyncBroadcastStreamController 异步广播流。

缓存最新一次事件的广播流控制器:BehaviorSubject

BehaviorSubject也是一个广播流,但是它能记录下最新一次的事件,并在新的收听者收听的时候将记录下的事件作为第一帧发送给收听者。

还记得我们文章开头的那个小问题吗?在B页面重新收听的时候,获取不到最新的事件,必须等我们重新触发流才可以得到正确的值。

我发誓我绝对不是为了凑篇幅🤣

ok,我们现在用BehaviorSubject替换掉我们的StreamCroller

//var _countController = StreamController.broadcast<int>();

var _subject = BehaviorSubject<int>();

真的就是这么简单,无缝替换😆

代码已上传github,让我们来看看效果

再来看两个例子,相信大家会对BehaviorSubject理解更深刻

例1

  final subject = new BehaviorSubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 3
  subject.stream.listen(print); // prints 3
  subject.stream.listen(print);

输出:3 3 3

由于我们在add(3)之后才开始收听,所以将会收到最新的value。

例2

  final subject = new BehaviorSubject<int>(seedValue: 1);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);

输出:1 1 1

seedValue作为初始值,在后面有收听者的时候同样会把它当成最后一次的value发送给收听者。

缓存更多事件的广播流控制器:ReplaySubject

ReplaySubject能够缓存更多的值,默认情况下将会缓存所有值,并在新的收听的时候将记录下的事件作为第一帧发送给收听者。

  final subject = ReplaySubject<int>();

  subject.add(1);
  subject.add(2);
  subject.add(3);
  
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);

输出:1 1 1 2 2 2 3 3 3

你还可以通过maxSize控制缓存个数

  final subject = ReplaySubject<int>(maxSize: 2);

  subject.add(1);
  subject.add(2);
  subject.add(3);

  subject.stream.listen(print); // prints 1
  subject.stream.listen(print); // prints 1
  subject.stream.listen(print);

输出:2 2 2 3 3 3

自定义你的Subject

你可以通过自定义一个新的subject继承至Subject类来获得更加个性化的功能。这里就不举栗子了。😝

Subject的释放

当你不再收听Subject,或者Subject不再使用时,请务必释放它。你可以调用subscription的cancel()方法让某个听众取消收听,或者Subject.close(),关闭整个流。

了解更多

下面有一些优秀的文章能够给您更多参考

写在最后

以上便是RxDart篇的全部内容,它只是介绍了部分RxDart的功能,我在之后会逐渐完善它,最终整理完整。

RxDart十分强大,它让你在处理大量异步事件的时候感觉非常舒适。我相信每一个开发者在了解过它之后一定会喜欢上这个好用的库。

如果你在使用rxdart时候有任何好的idea,或是query,欢迎在下方评论区以及我的邮箱1652219550a@gmail.com留言,我会在24小时内与您联系!

下一篇文章将会是flutter状态管理总结篇,敬请关注。