FLutter 中使用 Stream 和 ValueNotifier 实现跨组件通信和自动取消注册

2,452 阅读3分钟

响应式编程 Stream

每个 future 代表一个单一值,它异步的传递数据或者错误,Stream 的工作方式与 future 类似,不同的是它代表一个单一 事件,随着时间推移,它可以传送零个、多个数据,或者错误。

单一多个
SyncintIterator
AsyncFutureStream

使用 Stream ,当数据送达时、出现错误时、发送完成时,都有相应的回调函数。

使用 StreamController

如果 Stream 的事件不仅来自于异步函数可以遍历的 Stream 和 Future(下面的循环接收 Stream 事件 ),那么我们通常使用 StreamController 来创建和填充 Stream。

StreamController 可以为你生成一个 Stream,并提供在任何时候、任何地方将事件添加到该 Stream 的方法。该 Stream 具有处理监听器和暂停所需的所有逻辑,StreamController可以自行处理数据的收发,一般使用只需返回调用者所需的 Stream 即可。

  final _controller = StreamController();

  Stream<int> get stream => _controller.stream.asBroadcastStream();

  StreamSink<int> get sink => _controller.sink;

	stream.listen(print); // 打印输出整数。

上面可以使用 stream 监听事件,使用 sink 装载事件。

循环接收 Stream 事件

Stream 可以通过许多方式创建,而像使用 for 循环 迭代一个 Iterable 一样,我们也可以使用 异步 for 循环 (通常我们直接称之为 await for)来迭代 Stream 中的事件。例如:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  }

上面代码只是简单地接收整型事件流中的每一个事件并将它们相加,然后返回(被 Future 包裹)相加后的整型值。当循环体结束时,函数会暂停直到下一个事件到达或 Stream 完成。

错误事件

当 Stream 再也没有需要处理的事件时会变为完成状态,与此同时,调用者可以像接收到新事件回调那样接收 Stream 完成的事件回调。当使用 await for 循环读取事件时,循环会在 Stream 完成时停止。

有时在 Stream 完成前会出现错误;比如从远程服务器获取文件时出现网络请求失败,或者创建事件时出现 bug,尽管错误总是会有可能存在,但它出现时应该告知使用者。

Stream 可以像提供数据事件那样提供错误事件。大多数 Stream 会在第一次错误出现后停止,但其也可以提供多次错误并可以在在出现错误后继续提供数据事件。

当使用 await for 读取 Stream 时,如果出现错误,则由循环语句抛出,同时循环结束。你可以使用 try-catch 语句捕获错误。下面的示例会在循环迭代到参数值等于 4 时抛出一个错误:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

使用 Stream 跨组件通信

在 flutter 中,组件间传值有多种实现,使用 dart 提供的 Stream 是最简单的方法了。我们可以通过单利模式实现一个数据中心,不同的 stream 去实现对应的事件流。

import 'dart:async';

class DataModel {
  static DataModel _instance;

  static DataModel get instance => _getInstance();

  factory DataModel() => _getInstance();

  DataModel._internal();

  static DataModel _getInstance() {
    if (_instance == null) {
      _instance = DataModel._internal();
    }
    return _instance;
  }

  // ignore: close_sinks
  final _controller = StreamController();

  Stream<int> get stream => _controller.stream.asBroadcastStream();

  StreamSink<int> get sink => _controller.sink;

}

上面就是一个事件中心,在 page1 中接收数据并显示,在 page2 中发送一个随机数。

// page1
  _addDataListener() {
    DataModel().stream.listen((event) {
      print("==_Page1State._addDataListener: event==$event");
      if (mounted) {
        setState(() {
          _num = event;
        });
      }
    });
  }
// page2
_sendData() {
    var nextInt = Random().nextInt(100);
    DataModel().sink.add(nextInt);
  }

进入 page1 ,然后退出,进入 page2 ,发送随机数,控制台打印了 page1 的输出,并警告了内存泄漏的风险。

E/flutter ( 7437): This error might indicate a memory leak if setState() is being called because another object is retaining a reference to this State object after it has been removed from the tree. To avoid memory leaks, consider breaking the reference to this object during dispose().

虽然跨组件可以传递数据,但是没有取消订阅。我们可以手动拿到订阅返回的 StreamSubscription,在 dispose里取消订阅。但是每次这样比较麻烦,另外 stream 多个的时候,就要生成多个 StreamSubscription

使用 ValueNotifier 跨组件通信

ValueNotifier 的 value 不等于旧的值时,,会通知它的监听器。下面我们在 DataModel 里声明一个 ValueNotifier,同样在 page1 里监听,在 page2 里传值。


  ValueNotifier<int> _valueNotifier = ValueNotifier(0);

  ValueNotifier<int> get valueNotifier => _valueNotifier;
  _addValueListener() {
    DataModel().valueNotifier.addListener(() {
      var value = DataModel().valueNotifier.value;
      print("==_Page1State._addDataListener: event==$value");
      if (mounted) {
        setState(() {
          _value = value;
        });
      }
    });
  }
...
Scaffold(
        appBar: AppBar(
          title: Text('PAge1'),
        ),
        body: Center(
          child: Column(
            children: <Widget>[
              Text('接收到stream的数据是:$_num'),
              Text('接收到ValueNotifier的数据是:$_value')
            ],
          ),
        )); // page1
  _sendValueData() {
    var nextInt = Random().nextInt(100);
    DataModel().valueNotifier.value = nextInt;
  } // page2

同样进入 page1 然后去 page2 ,点击按钮发送数据,控制台打印:

I/flutter ( 7437): ==_Page1State._addDataListener: event==2

自动取消订阅

上面的例子都需要自己去实现退出页面的时候取消订阅。属实麻烦,下面我们利用代理模式,去实现自动取消订阅。

class Disposer {
  final List<StreamSubscription> _subscriptions = [];

  final List<Listenable> _listenables = [];
  final List<VoidCallback> _listeners = [];

  /// Track a stream subscription to be automatically cancelled on dispose.
  void autoDispose(StreamSubscription subscription) {
    if (subscription == null) return;
    _subscriptions.add(subscription);
  }

  /// Add a listener to a Listenable object that is automatically removed when
  /// cancel is called.
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    if (listenable == null || listener == null) return;
    _listenables.add(listenable);
    _listeners.add(listener);
    listenable.addListener(listener);
  }

  /// Cancel all listeners added & stream subscriptions.
  ///
  /// It is fine to call this method and then add additional listeners.
  void cancel() {
    for (StreamSubscription subscription in _subscriptions) {
      subscription.cancel();
    }
    _subscriptions.clear();

    assert(_listenables.length == _listeners.length);
    for (int i = 0; i < _listenables.length; ++i) {
      _listenables[i].removeListener(_listeners[i]);
    }
    _listenables.clear();
    _listeners.clear();
  }
}

Disposer里维护一个StreamSubscription列表,可以通过autoDispose添加订阅事件,也可以通过 addAutoDisposeListener添加 ValueNotifier的订阅。

mixin AutoDisposeMixin<T extends StatefulWidget> on State<T>
implements Disposer {
  final Disposer _delegate = Disposer();

  @override
  void dispose() {
    cancel();
    super.dispose();
  }

  void _refresh() => setState(() {});

  @override
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    _delegate.addAutoDisposeListener(listenable, listener ?? _refresh);
  }

  @override
  void autoDispose(StreamSubscription subscription) {
    _delegate.autoDispose(subscription);
  }

  @override
  void cancel() {
    _delegate.cancel();
  }
}

这个混入里,利用 Disposer这个代理操作类管理订阅。下面是使用:

class _AutoPageState extends State<AutoPage> with AutoDisposeMixin {
  int _num = 0;
  int _value = 0;

  @override
  void initState() {
    super.initState();
    _addDataListener();
    _addValueListener();
  }

  _addDataListener() {
    autoDispose(DataModel().stream.listen((event) {
      print("==AutoPage._addDataListener: event==$event");
      if (mounted) {
        setState(() {
          _num = event;
        });
      }
    }));
  }

  _addValueListener() {
    addAutoDisposeListener(DataModel().valueNotifier, () {
      var value = DataModel().valueNotifier.value;
      print("==AutoPage._addValueListener: event==$value");
      if (mounted) {
        setState(() {
          _value = value;
        });
      }
    });
  }

  @override
  void dispose() {
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('AutoPage'),
      ),
      body: Center(
        child: Column(
          children: <Widget>[
            Text('接收到stream的数据是:$_num'),
            Text('接收到ValueNotifier的数据是:$_value')
          ],
        ),
      ),
    );
  }
}

和 page1 一样的逻辑,只是在订阅的时候多了一个autoDisposeaddAutoDisposeListener,进入 autopage 再去page2 ,发送数据,并没有打印,说明取消成功。

当然,并不是所有的通信都是全局的,有的是固定的组件内,作用域只在当前组件,有的是在vm层,并不在组件层,这时候我们就需要像 mvp 里的 presenter 或者 mvvm 架构里的 vm 一样,暴露 dispose 方法,在 view 层调用。同样我们也实现了一个混入:

mixin AutoDisposeControllerMixin on DisposableController implements Disposer {
  final Disposer _delegate = Disposer();

  @override
  void dispose() {
    cancel();
    super.dispose();
  }

  @override
  void addAutoDisposeListener(Listenable listenable, [VoidCallback listener]) {
    _delegate.addAutoDisposeListener(listenable, listener);
  }

  @override
  void autoDispose(StreamSubscription subscription) {
    _delegate.autoDispose(subscription);
  }

  @override
  void cancel() {
    _delegate.cancel();
  }
}

使用的时候继承DisposableController并混入 AutoDisposeControllerMixin即可,在需要取消订阅的地方,调用 dispose就能实现取消订阅。

代码

参考:(异步编程:使用 stream)[dart.dev/tutorials/l…]