Stream in Dart

2,240 阅读5分钟

大家应该都吃过转盘小火锅吧,情形是这样的的:好多个人坐在一块,围着一条传送带,每个人的位置上都会有一个小火锅,厨师将菜品放到传送带上,这些菜品会随着传送带经过每个人的位置,如果看到你想吃的菜品,则直接拿着放到自己的小火锅里;如果没碰到想吃的,则直接滤过,但传送带会继续将他们传递到下一个人的身边。

上述情况我们需要注意这样几个问题:

1. 我们只有坐在这条传送带旁边,才能吃到上面的菜。

2. 我们想吃的菜品不是立马就能出现到你面前的。如果你想吃肥牛,前提是需要厨师在传送带上面放一盘肥牛,肥牛才会被传送到你旁边,但是你并不知道肥牛什么时候才能到你身边。

3. 传送带上的菜品都是按照厨师放置的顺序依次被传送到你身边的。

Stream

那么什么是 Stream 呢?上面的传送带就是 Stream,传送带上面可以传递任何菜品,相应的 Stream 也就可以传递任何数据类型。

为了方便的控制 Stream ,我们通常使用 StreamControllerStreamController 中提供了两个属性,一个是用来往 Stream 中添加数据的 sink,一个是用于接收数据的 stream

上面说了,只有坐在传送带旁边才能吃到传送带上的菜品,这里,我们也只需要使用streamController.stream.listen(...) 就可以收到通知 (当 Stream上有数据的时候)。

只要我们坐在传送带前,我们就成了消费者,对应的,当我们 listen 了一个 Stream 的时候,我们就成了一个 StreamSubscription (订阅者)。

最后,当我们不需要这个 Stream 的时候,我们需要将其 close 掉,使用streamController.close()

下面我们用一个例子来演示下:

import 'dart:async';  
  
void main() {  
  // 声明一个 StreamController  
  StreamController controller = StreamController();  
  
  // 监听此 Stream  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('$value'));  
  
  // 往 Stream 中添加数据  
  controller.sink.add(0);  
  controller.sink.add('a, b, c, d');  
  controller.sink.add(3.14);  
  
  // 关闭 StreamController  
  controller.close();  
}

输出:

0
a, b, c, d
3.14

上述代码中,我们声明了一个 StreamController,然后往往里面放置了三种不同的数据类型,当然,我们也可以使用泛型的方式来限制里面的数据类型:

StreamController<int> controller = StreamController<int>();

接下来我们看看 stream.listne(...) 这个方法,源码如下:

StreamSubscription<T> listen(void onData(T event),  
  {Function onError, void onDone(), bool cancelOnError});

可以看出 listen(...) 方法接受一个必选参数和三个可选参数,我们上一个例子中只是传递了必选参数 onData(...),其他额三个参数并没有传递,下面我们来举例说明 listen(...) 方法中各个参数的用途。

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .listen(onData, onError: onError, onDone: onDone, cancelOnError: true);  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  // 发送一个 Error  
  controller.sink.addError(-1);  
  controller.sink.add(2);  
  
  controller.close();  
}  
  
void onData(int data) {  
  print('The value is $data');  
}  
  
void onError(err) {  
  print('The err is $err');  
}  
  
void onDone() {  
  print('The stream is done !');  
}

输出:

The value is 0
The value is 1
The err is -1

从上述代码中,我们可以看到,stream.listen(...) ,接受 4 个参数,这 4 个参数的作用分别如下:

onData(T data) : 用来接收 Stream 中的每一个事件。

onError(...) : 注释上是这样说的 The [onError] callback must be of type void onError(error),也就是所他需要一个接受一个参数的方法。但它还支持接受两个参数的方法 void onError(error, StackTrace stackTrace)

onDone() : 当一个 Stream 关闭了,也就是执行了 stream.close() 方法并且发送了 done 事件,这个方法会被调用。

cancelOnError : 这是一个 bool 类型的值,意思也很简单,就是当 Stream 碰到 Error 事件的时候,是否关闭这个 Stream

我们上述代码中 cancelOnError 参数传递的是 true ,也就是说当Stream遇到 Error 的时候,Stream 就关闭了,下面的事件就不会再发送出去了。我们把上面代码中的 cancelOnError 参数改为 false 其他的代码不变,输出如下:

The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !

可以看出,这个情况下,即使 Stream 中遇到了 Error ,下面的事件依然会接着发送,并且最后的 done 事件也执行了。

Stream 类型

Stream 类型分为两种,分别为 Single-subscription StreamsBroadcast Streams

Single-subscription Streams

这种类型的 Stream 只允许一个订阅者,也就是只能 listen 一次。我们上一小节中的例子就是一个 Single-subscription Streams。接下来我们看看如果我们对这种类型的 Stream 订阅两次会发生什么情况。

import 'dart:async';  
  
void main() {  
  // Single-subscription Streams  
  StreamController<int> controller = StreamController<int>();  
  
  // 第一个订阅者  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('subscription1 $value'));  
  
  // 第二个订阅者  
  StreamSubscription subscription2 =  
      controller.stream.listen((value) => print('subscription2 $value'));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  controller.close();  
}

输出:

Unhandled exception:
Bad state: Stream has already been listened to.
...

Broadcast Streams

这种类型的 Stream 允许任意数量的订阅者,只是新的订阅者只能从它开始订阅的时候接收事件。也就是订阅之前 Stream 中的事件是接受不到的。

Broadcast Streams 的声明方式如下:

StreamController<int> controller = StreamController<int>.broadcast();

接下来我们写个示例看看:

import 'dart:async';  
  
void main() {  
  // Broadcast Streams  
  StreamController<int> controller = StreamController<int>.broadcast();  
  
  StreamSubscription sub1 =  controller.stream.listen((value) => print('sub1 value is $value'));  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));  
  controller.sink.add(2);  
  controller.sink.add(3);  
  
  controller.close();  
}

输出:

sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3

可以看出 sub1 可以接收到 Stream 中所有的数据,而 sub2 只能接收到从订阅这个 Stream 之后发送的数据。

StreamTransformer

当数据通过 Stream 传递的时候,我们可以按需来转换里面的数据,Dart 中给我们提供了 StreamTransformer 来对数据做出一些特定转换。

我们可以通过三种方式来实现数据转换:

  • Stream 自带的方法,如 mapwhere
  • 通过 StreamTransformer.fromHandlers(...) 来转换
  • 直接实现一个 StreamTransformer 来定义一个转换器

map、where ...

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .where((value) => value % 2 == 0) // where  
  .map((value) => 'The value is $value') // map  
  .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

输出:

The value is 0
The value is 2
The value is 4

上述代码中,我们使用了 wheremap 转换符。where 将满足条件的值过滤出来,然后 map 将整型的数字转换成字符串类型的值。

StreamTransformer.fromHandlers(...)

import 'dart:async';  
  
// 转换方法  
void handleData(data, EventSink sink) {  
  if (data % 2 == 0) {  
    sink.add(data);  
  }  
}  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .transform(StreamTransformer.fromHandlers(handleData: handleData))  
      .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

输出:

0
2
4

自定义 StreamTransformer

在自定义我们自己的 Transformer 之前,我们先来看看 stream.transform(...) 做了什么事情,毕竟我们是通过 transform(...) 方法传入的 Transformer, transform(...) 源码如下:

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {  
  return streamTransformer.bind(this);  
}

可以看出 transform 方法返回的依然回一个 Stream ,只不过这个 Stream 是经过转换后的Stream。 类似于 Java8 Stream 中的中间流,就是不停的返回 Stream 的那种。然后该方法是一个泛型方法,泛型类型分别为 T,和 S,可以这样理解,T 为该方法的入参类型,S 为该方法的出参类型,类似于 Java8 中的 Function,我们可以看下 Java 中的 Function 接口的部分源码:

/**  
 * ...
 * @param <T> the type of the input to the function  
 * @param <R> the type of the result of the function  
 * * @since 1.8  
 */
@FunctionalInterface  
public interface Function<T, R> {  
 R apply(T t);
 ...
 }

最后,transform(...) 方法调用了 streamTransformer.bind(this); 该方法返回的是一个新的 Stream,也就是转换后的 Stream,当然,bind() 方法也是我们自定义 StreamTransformer 时需要实现的方法。

我们在之前的例子中声明一个 StreamController 的时候,都是没有传递参数的,其实,StreamController 的构造方法是这样的:

factory StreamController(  
    {void onListen(),  
  void onPause(),  
  void onResume(),  
  onCancel(),  
  bool sync: false}) {  
  return sync  
      ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)  
      : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);  
}

可以看出,StreamController 是有参数的,并且都是可选参数,在这些参数中,我们重点实现 onListen(),关于 onListen 的源码解释如下:

A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.

重点就是 onListen 方法是用来生产事件的。

接下来,来实现一个自定义的 StreamTransformer:

/// 自定义一个 StreamTransformer ,
/// 泛型类型 S 为入参类型,T 为出参类型
/// 这些类型都是 Stream 中传递的数据类型
class MyTransformer<S, T> implements StreamTransformer<S, T> {

  // 用来生成一个新的 Stream 并且控制符合条件的数据
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOrError;

  // 转换之前的 Stream
  Stream<S> _stream;

  MyTransformer({bool sync: false, this.cancelOrError}) {
    _controller = new StreamController<T>(
        onListen: _onListen,
        onCancel: _onCancel,
        onPause: () {
          _subscription.pause();
        },
        onResume: () {
          _subscription.resume();
        },
        sync: sync);
  }

  MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
    // 定义一个 StreamController,注意泛型类型为 T,也就是出参类型,因为
    // 我们是使用该 _controller 生成一个用来返回的新的 Stream<T>
    _controller = new StreamController<T>.broadcast(
        onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    // _stream 为转换之前的 Stream<S>
    _subscription = _stream.listen(onData,
        onError: _controller.addError,
        onDone: _controller.close,
        cancelOnError: cancelOrError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  // 数据转换
  void onData(S data) {
    if ((data as int) % 2 == 0) {
      // 将符合条件的数据添加到新的 Stream 中
      _controller.sink.add(data);
    }
  }

  // 参数为转换之前的 Stream<S>
  // 返回的是一个新的 Stream<T> (转换之后的 Stream)
  @override
  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    // TODO: implement cast
    return null;
  }
}

使用如下:

void main() {
  StreamController<int> controller = StreamController<int>();

  controller.stream
      .transform(new MyTransformer()) // 自定义的 StreamTransformer
      .listen((value) => print('$value'));

  controller.sink.add(0);
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);

  controller.close();
}

输出:

0
2
4

如有错误,还请指出。谢谢!!!

参考链接