关于Dart中Future源码剖析

2,165 阅读3分钟
  • Future是个什么东西:Dart中实现异步操作的一种方式。
  • Future异步原理: 将构造函数中传入的Function置于eventQueque或microQueque中,由eventloop进行循环执行完成异步。
  • Future如何实现的:Timer,zone
  • Timer:eventQueque中包含各种外来事件,比如点击事件,页面绘制,timers等,而Future之所以能够将要处理的Function放入eventQueque中正是用Timer实现的。
  • zone:沙盒概念,代码的执行区域,可捕获Dart中try catch无法捕获的Future异步操作中发生的异常,并且可调用registerUnaryCallback注册回调方法,并用runUnart执行。

分析源码:

这次我们来分析下future的初始化过程,以及then是如何监听结果的返回的。

起点:构造函数

平时使用Future时就是调用它的构造函数,比如这样 Future(()=>print("1"));,看下它的构造函数:

factory Future(FutureOr<T> computation()) {
    _Future<T> result = new _Future<T>();
    Timer.run(() {
      try {
        result._complete(computation());
      } catch (e, s) {
        _completeWithErrorCallback(result, e, s);
      }
    });
    return result;
  }

可以看到是一个工厂构造函数,内部有两个重要的组成部分,

  • Timer:用于把事件放入eventQueque。
  • _Future:Future真正实例化的主体。

_Future

跟随着上面的流程,我们接下来看_Future中是什么:

1.首先是记录当前Future的状态类型

  //初始化状态
  static const int _stateIncomplete = 0;
  //等待完成状态
  static const int _statePendingComplete = 1;
  //多个Future的链状态
  static const int _stateChained = 2;
  //正常完成状态
  static const int _stateValue = 4;
  //异常完成状态
  static const int _stateError = 8;

2.之后是Zone,我们异步执行的代码就是在这个区域中执行的,每个Future实例都有自己的zone

final Zone _zone;

两个比较重要的方法: registerUnaryCallback是Zone中注册回调的方法 runUnary 执行回调方法

3._resultOrListeners

上个Future的执行结果

4.then方法

Future<R> then<R>(FutureOr<R> f(T value), {Function onError}) {
    Zone currentZone = Zone.current;
    if (!identical(currentZone, _rootZone)) {
      f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f); //创建回调方法,要执行的方法f作为参数传入
      if (onError != null) {
        // In checked mode, this checks that onError is assignable to one of:
        //   dynamic Function(Object)
        //   dynamic Function(Object, StackTrace)
        onError = _registerErrorHandler(onError, currentZone);
      }
    }
    return _thenNoZoneRegistration<R>(f, onError);
  }

从上面的代码看到,f就是我们传入的希望执行的方法对象, 通过zone的registerUnaryCallback创建回调方法,

 Future<E> _thenNoZoneRegistration<E>(
      FutureOr<E> f(T value), Function onError) {
    _Future<E> result = new _Future<E>();
    _addListener(new _FutureListener<T, E>.then(result, f, onError));
    return result;
  }

上面的方法再次创建_Future对象,并且通过 _addListener 方法添加监听

void _addListener(_FutureListener listener) {
    assert(listener._nextListener == null);
    if (_mayAddListener) {
      listener._nextListener = _resultOrListeners;
      _resultOrListeners = listener;
    } else {
      if (_isChained) {
        // Delegate listeners to chained source future.
        // If the source is complete, instead copy its values and
        // drop the chaining.
        _Future source = _chainSource;
        if (!source._isComplete) {
          source._addListener(listener);
          return;
        }
        _cloneResult(source);
      }
      assert(_isComplete);
      // Handle late listeners asynchronously.
      _zone.scheduleMicrotask(() {
        _propagateToListeners(this, listener);
      });
    }
  }

这里需要注意 _mayAddListener 的赋值逻辑

bool get _mayAddListener => _state <= _statePendingComplete;

即开头提过的:初始化状态,或者等待完成这两个状态可以添加listener, 而无法添加listener的状态则调用zone的_zone.scheduleMicrotask方法去之后执行添加的行为。

那么刚刚添加进去的listener是在什么时候执行的呢,开头的构造方法中,我们调用了_Future 的 _complete方法

void _complete(FutureOr<T> value) {
    assert(!_isComplete);
    if (value is Future<T>) {
      if (value is _Future<T>) {
        _chainCoreFuture(value, this);
      } else {
        _chainForeignFuture(value, this);
      }
    } else {
      _FutureListener listeners = _removeListeners();
      _setValue(value);
      _propagateToListeners(this, listeners); //就是这里
    }
  }

我们继续看_propagateToListeners的代码,代码较长,只取比较重要的部分展示

 if (listener.handlesComplete) {
          handleWhenCompleteCallback();
        } else if (!hasError) {
          if (listener.handlesValue) {
            handleValueCallback(); //这里是正常流程的回调方法执行的位置
          }
        } else {
          if (listener.handlesError) {
            handleError();
          }
        }

继续看handleValueCallback

 void handleValueCallback() {
          try {
            listenerValueOrError = listener.handleValue(sourceResult);
          } catch (e, s) {
            listenerValueOrError = new AsyncError(e, s);
            listenerHasError = true;
          }
        }
        
 FutureOr<T> handleValue(S sourceResult) {
    return _zone.runUnary<FutureOr<T>, S>(_onValue, sourceResult);
  }

可以看到,最终我们调用了开头说到的zone的_zone.runUnary方法去执行我们传入的回调方法。 至此,整个Future的大体流程我们跟随着源码的逻辑大体梳理了一遍。

总结一下:1.Future是Time和Zone的封装 2.Future的then传入的事件并未放入到event队列中,而只是个回调方法 3.如果Future的状态是已经结束的状态,则再用这个实例去添加listener,即再调用then方法,直接将传入的事件放入到microQueque中处理。