顾名思义,Stream 就是流的意思,表示发出的一系列的异步数据。可以简单地认为 Stream 是一个异步数据源。它是 Dart 中处理异步事件流的统一 API。
集合与Stream Dart 中,集合(Iterable或Collection)表示一系列的对象。而 Stream (也就是“流”)也表示一系列的对象,但区别在于 Stream 是异步的事件流。比如文件、套接字这种 IO 数据的非阻塞输入流(input data),或者用户界面上用户触发的动作(UI事件)。
推和拉就是别人给你还是你自己去拿的区别。但是不管如何获取数据,二者的本质都可以认为是数据的集合(数据可能无限多)。所以,二者有很多相同的方法,稍后介绍。
怎么理解 Stream 中的数据? 数据(data)是个非常抽象的概念,可以认为一切皆数据。在程序的世界里,其实只有两种东西:数据和对数据的操作。对数据的操作就是对输入的数据经过一些计算,之后输出一些新数据。事件(event,如UI上的事件)、计算结果(value,如函数/方法的返回值)以及从文件或网络获得的纯数据都可以认为是数据(data)。另外,Dart 中的所有事物都是对象,所以数据也一定是某种对象(object)。在本文中,可以认为事件、结果、数据、对象都是一样的,不用特意区分。
Stream 与 Future
基本使用
获得 Stream Dart 中统一使用 Stream 处理异步事件流,所以可以获得 Stream 的地方很多。为了方便演示,这里先介绍2种获取 Stream 的方式。
1. 将集合(Iterable)包装为 Stream Stream 有3个工厂构造函数:fromFuture、fromIterable 和 periodic,分别可以通过一个 Future、Iterable或定时触发动作作为 Stream 的事件源构造 Stream。下面的代码就是通过一个 List 构造的 Stream。
var data = [1, 2, 3, 4];
var stream = new Stream.fromIterable(data);
对集合的包装只是简单地模拟异步,定时触发、IO输入、UI事件等现实情况才是真正的异步事件。 2. 使用 Stream 读文件 读文件的方式有多种,其中一种是使用 Stream 获得文件内容。File 的方法 openRead() 返回一个
var stream = new File(new Options().script).openRead();
订阅 Stream 当你有了一个 Stream 时,最常用的功能就是通过
我们在接收一个输入流的时候要面临几种不同的情况和状态,最基本的是处理收到数据,此外上游还可能出现错误,以及出现错误时是否继续后续数据的处理,最后在输入完成的时候还有一个结束状态。所以 listen 方法的几个参数分别对应这些情况和状态:
- onData,处理收到的数据的 callback
- onError,处理遇到错误时的 callback
- onDone,结束时的通知 callback
- unsubscribeOnError,遇到第一个错误时是否停止(也就是取消订阅),默认为false
下面我们订阅一个 Stream 的数据,收到数据时只是简单地打印出来:
var data = [1, 2, 3, 4];
var stream = new Stream.fromIterable(data);
stream.listen((e)=>print(e), onDone: () => print('Done'));
// => 1, 2, 3, 4
// => Done
上面的代码会先打印出从 Stream 收到的每个数字,最后打印一个‘Done'。 当 Stream 中的所有数据发送完时,就会触发 onDone 的调用,但提前取消订阅不会触发 onDone 。在结束的同时(收到 onDone 事件之前),所有的订阅者都被取消了订阅,此时 Stream 上便没有订阅者了。允许对一个已经结束了的 Stream 再添加订阅者(尽管没什么意义),此时只会立刻收到一个 onDone 事件。
stream.listen(print, onDone: () {
print('first done');
//listen again
stream.listen(print, onDone:() => print('second done'));
});
// => data: 1,2,3,4,
// => first done
// => no data, because stream is done
// => second done
上面的代码中,首先我们在 onDone 的回调中打印了 'first done' 表示第一次结束。此时 stream 上已经没有订阅者了,但接着我们又再次订阅了这个 stream。这一次没有再收到数据,而是马上打印出了 ‘second done’ 表示第二次订阅的结束。 高级订阅管理 前面的示例代码会处理 Stream 发出的所有数据,直到 Stream 结束。如果想提前取消处理怎么办?listen() 方法会返回一个
var sub = stream.listen(null);
sub.onData(print);
sub.onError((e)=>print('error $e'));
sub.onDone(()=>print('done'));
// => 1, 2, 3, 4, done
上面的代码与前面的 listen 示例代码作用相同。
var sub = stream.listen(null);
sub.onData((e){
if(e > 2)
sub.cancel();
else
print(e);
});
sub.onDone(()=>print('done'));
// => 1, 2
// no 'done', because stream is cancel.
上面的代码最后会打印出1和2,但不会打印出‘done' 。首先,listen 中的参数为 null,也就是没有订阅者。然后,通过 listen 的返回者 subscription 对象设置了 onData 和 onDone 的处理,这时才有了订阅者。在 onData 中,如果收到的数字大于2就取消后续处理,因此到数字 3 的时候就没有打印 3,而是立即结束了处理,这样后面的 4 也不会出现了。既然是提前退出,所以 onDone 也是不会触发的。 Stream 两种订阅模式 Stream有两种订阅模式:单订阅(single)和多订阅(broadcast)。单订阅就是只能有一个订阅者,而广播是可以有多个订阅者。这就有点类似于消息服务(Message Service)的处理模式。单订阅类似于点对点,在订阅者出现之前会持有数据,在订阅者出现之后就才转交给它。而广播类似于发布订阅模式,可以同时有多个订阅者,当有数据时就会传递给所有的订阅者,而不管当前是否已有订阅者存在。
Stream
assert(stream.isBroadcast == false);
stream.first.then(print);
stream.last.then(print);// Bad state: Stream already has subscriber.
上的代码需要分别打印出 stream 的第一个数据和最后一个数据,但是单模式 Stream 只能订阅一次,所以直接出错了。当然,Stream 是异步的,所以 first 也没有打印出来。
var bs = stream.asBroadcastStream();
assert(bs.isBroadcast == true);
bs.first.then(print);
bs.last.then(print);
// OK => 1, 4
上面的代码,我们把单模式 Stream 转成了多订阅的 Stream,所以可以 first 和 last 都打印出来了。 按前面说的,单订阅模式会持有数据,多订阅模式如果没有及时添加订阅者则可能丢数据。不过具体取决于 stream 的实现。
new Timer(new Duration(seconds:5), ()=>stream.listen(print));
// after 5 second, it output 1,2,3,4
上面的代码利用 Timer 延迟了5秒才订阅 stream,但仍然输出了数据。因为我们这里的这个
var bs = stream.asBroadcastStream();
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it also output 1,2,3,4
// because asBroadcastStream() is a simple wrap,
// it don't change the source stream's feature
上面我们把原始的单订阅模式转成了多订阅模式的 Stream,此时可以添加多个订阅者。我们5秒后才在 broadcast stream 上添加了订阅者,但它依然输出了 1,2,3,4 ,并没有漏掉数据。这其实是因为 asBroadcastStream() 只是对原始 stream 的封装,并不改变原始 stream 的实现特性。所以这个
var bs = stream.asBroadcastStream();
// add first listener
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it output 1,2,3,4
// add second listener
new Timer(new Duration(seconds:10), ()=>bs.listen(print));
// after 10 second, nothing output, because stream is done
再来看另外一个例子,我们自己来创建一个 Stream。StreamController 用于创建 Stream,它有两个构造函数,分别用于创建单订阅模式 Stream 和 多订阅模式 Stream。然后可以利用 add()、addError() 和 close() 方法发送事件、发送错误和结束,这三个方法来自
// build single stream
//var controller = new StreamController();
// build broadcast stream
var controller = new StreamController.broadcast();
//send event
controller..add(1)
..add(2)
..add(3)
..add(4);
//send done
controller.close();
var myStream = controller.stream;
new Timer(new Duration(seconds:5), ()=>myStream.listen(print));
//if myStream is single stream, it output 1,2,3,4
//if myStream is broadcast stream, it output nothing, because stream is done.
Stream 的集合特性 前面说过,Stream 和一般的集合类似,都是一组数据,只不过一个是异步推送,一个是同步拉取。所以他们都很多共同的方法。例如:
stream.any((e) => e > 2).then(print);// stream.any()
print([1,2,3,4].any((e) => e > 2));// iterable.any()
// => true, true
比如 Stream 和 集合 都有 any() 方法,集合是同步的(但是惰性执行,这里因为有 print 调用,所以立刻执行了)并直接返回结果, Stream 上的 any() 方法是异步的,返回的是 Future 。方法本身的含义都是一样的。上面的代码虽然 stream 的 any 方法在前,但因为是异步的,所以的输出在后。 在列举其它 Stream 和 Iterable 通用的方法:
//常见集合方法
stream.first.then(print);
stream.firstWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.last.then(print);
stream.lastWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.length.then(print);
stream.isEmpty.then(print);
stream.any((e) => e > 2).then(print);
stream.every((e) => e > 2).then(print);
stream.contains(3).then(print);
stream.elementAt(2).then(print);
stream.where((e) => e >2).listen(print);
stream.skip(2).listen(print);
stream.skipWhile((e) => e < 2).listen(print);
stream.take(2).listen(print);
stream.takeWhile((e)=>e<3).listen(print);
stream.map((e) => e*2).listen(print);
stream.reduce(0, (p, c) => p + c).then(print);
stream.expand((e) => [e, e]).listen(print);
stream.toList().then(print);
stream.toSet().then(print);
注意以上方法同时只能使用一次,因为是单订阅模式。此外,如果方法只有一个返回值,即数据收敛类型的方法,那么返回就是一个 Future。如果是只是数据转换的方法,如 map ,返回的还是一个 Stream,只是数据数据的类型和数量变了。看到这么多 Stream 与 Iterable 相同的方法,大家应该更清楚 Stream 其实也是个数据集合。 通用数据收敛方法 集合中有很多方法只返回一个值,多个数据作为输入、一个数据作为输出的方法就是数据收敛的方法。Stream 有一个更通用的收敛方法 pipe() 。pipe() 方法的参数要求是一个 StreamConsumer 接口的实现,该接口只有一个方法: Future consume(Stream
class DataConsumer implements StreamConsumer{
Future consume(Stream stream){
return stream.reduce(0, (c,p)=>c+p);
}
}
stream.pipe(new DataConsumer()).then(print);
// => 10
// equivalent below
stream.reduce(0, (p, c) => p + c).then(print);
上面我们自己实现了一个 通用数据转换方法 除了数据收敛方法,Stream 也有自己通用的数据转换方法 transform() 。类似于 Future 的连续调用,Stream 也可以连续调用。 transform 方法就是把一个 Stream 作为输入,然后经过计算或数据转换,输出为另一个 Stream。另一个 Stream 中的数据类型可以不同于原类型,数据多少也可以不同(比如实现一个数据的 buffer )。
transform 的方法签名是:
下面我们构造一个
var transformer = new StreamTransformer(
handleData: (e, sink){
sink.add(e*2);
}
);
stream.transform(transformer).listen(print);
// equivalent below
stream.map((e) => e*2).listen(print);
或
class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
sink.add(e*2);
}
}
stream.transform(new MyTransformer()).listen(print);
使用 StreamTransformer 接口的工厂构造函数 或者 举个更实用点例子,Dart 中的 StringDecoder 和 StringEncoder 就是一个
file.openRead()
.transform(new StringDecoder())
.transform(new LineTransformer())
.listen(your_process);
注意,不管是 Stream.map() 还是 Stream.transform() ,他们都是在做转换,而非订阅。对于单模式 Stream ,如果没有添加订阅者,那么转换方法根本不会执行(可能是由于是惰性执行的缘故)。
stream.map((e){
print(e);
return e*2;
});
// nothing output, because lazy evaluate
class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
print(e);
sink.add(e*2);
}
}
stream.transform(new MyTransformer());
// nothing output, because no subscription
上面的示例中,都在转换过程中做了输出,但实际不会输出内容,因为没有用 listen 添加订阅者。