mobx源码分析(二) 订阅响应式数据

209 阅读13分钟
原文链接: zhuanlan.zhihu.com

1 前言

处理问题有两种方式,从一般到具体,或者从具体到一般。在写作这两篇文章时,我选择了后者,一是因为我在写作的过程中才逐渐摸清 mobx 的设计和实现,二是因为我自身尚不具备足够的积淀,能站在更高的抽象维度对问题的一般面加以思索。前文对各种数据结构的处理无疑都是具体的,本文将介入抽象 observable,即对前文中的公有特征 reportObserved, reportChanged 方法加以萃取。

本文也将致力于解答前文遗留的问题,怎样使观察者订阅响应式数据的变更。

总而言之,本文将串联 mobx 的核心运作机制,从执行动作 action 促使响应式数据 observable 变更,再到衍生 derivation 执行的整个过程。

2 概念

首先,我们先来看一下 mobx 设计的几种抽象概念:

  1. observable: 响应式数据,最主要的特征是将数据变更信息上报给全局监听器。mobx 使用 IObservable 接口,Atom 类加以抽象。
  2. derivation: 衍生,响应式数据变更后执行的副作用函数,包含计算属性、反应。mobx 使用 IDerivation 接口,ComputedValue 类、Reaction 类加以抽象。
  3. action: 动作,由其促使响应式数据发生变更。上一篇文章已指出,使用 observableValue 实例的 set 方法,就能促使响应式数据发生变更,action 的意义在于,使用 startBatch, endBatch 事务封装执行动作,将一组响应式数据变更复合为一,等到这组响应式数据变更完结后,才执行 derivation。在 mobx 中,action 比较单薄,因为最要紧的——将响应式数据变更上报到全局环境——这一机制由 observable 完成。在这篇文章中将不作赘述。

2.1 observable

对于 observable,所需考虑的是,响应式数据变更会引起指定的观察者执行其处理逻辑;当响应式数据没有指定观察者时,数据变更就不会引起衍生的执行。为此,mobx 抽象的 IObservable 接口含有如下属性及方法:

  1. observers 属性为 observable 绑定的观察者队列。
  2. lowestObserverState 属性为状态标识,用于标记数据是否被更新,需要执行相应的衍生。
  3. diffValue 属性用于实时更新 observable, observer 的依赖关系。
  4. onBecameUnobservered, onBecameObservered 钩子,分别在 observable 不被监听或被监听时得到调用。
  5. lastAcessedBy(最后消费 observable 的观察者 id)。
  6. isBeingObserved 属性用于使 onBecameObservered 钩子不被反复调用。
  7. isPendingUnobservation 属性用于使 onBecameUnobservered 钩子不被反复执行。

备注:与配置项相当,属性需要通过翻阅源码才能感知到它存在的价值。介于部分属性与核心流程无关,在这里将只作简要讨论或不作讨论。

实现 IObservable 接口的 Atom 类额外增加了 reportObserved, reportChanged 方法。顾名思义,当 observable 被观察时,需要显示调用 reportObserved 方法;当 observable 数据变更时,需要显示调用 reportChanged 方法。更多内容,参见前文 - mobx 源码分析(一)构造响应式数据

2.2 derivation

derivation 可以理解为实际消费 observable 的观察者,因此,本文和前文中的 observer 就是 derivation。在 mobx 实现中,observable, derivation 相互持有彼此的引用。IDerivation 接口含有如下属性及方法:

  1. observing 属性本次衍生在哪些响应式数据变更时执行。
  2. dependenciesState 属性为状态标识,用于标记本次衍生观察的数据是否已经改变,是否运行期处理逻辑。
  3. onBecomeStale 方法就是当观察数据变更时,运行的处理逻辑。
  4. newObserving 属性用于变更 observable, derivation 的依赖关系(在于观察者可改变观察的数据)。
  5. unboundDepsCount 属性用于统计本次衍生所观察的数据量,同 observable.diffValue 一样,目的都在于实时更新 observable, derivation 的依赖关系。
  6. runId 属性,由它构成 observable.lastAcessedBy 的值。
  7. isTracing 属性标记日志级别,以便在 onBecomeStale 方法执行前打印日志。

IDerivation 接口有两类实现,其一是作为反应的 Reaction 类,其二是作为计算属性的 ComputedValue 类。这两个类都实现了具体的 onBecomeStale 方法。reaction.onBecomeStale 方法的表现是在所有响应式数据变更完成后,再对相关的衍生执行批处理操作,当然,在同一个批处理周期内,不会再对由 reaction 引起的衍生加以处理,这些衍生需要等待下一个批处理周期。ComputedValue 的特别之处是,它既是衍生,又是响应式数据。因此,computedValue.onBecomeStale 方法的执行时机是在其他反应调用 onBecomeStale 过程中,重新获取计算属性的值。

3 运作机制

3.1 reportObserved

当响应式数据被衍生订阅时,将会执行 obsrvable.reportObserved 方法。在该方法的执行过程中,无他,就是针对当前执行的衍生调用其观察的响应式数据的 onBecameObserved 方法;或者将该 obsrvable 实例添加到 globalState.pendingUnobservations 数组中,等待事务结束时,执行 observable.onBecomeUnobserved 与 computedValue.suspend 方法。这里不作介绍。

3.2 reportChanged

结合上图,当响应式数据发生变更时,mobx 的处理机制为:

  1. 通过 observable.reportChanged 方法将响应式数据变更的信息上报到全局环境。
  2. observable.reportChanged 执行过程中,使用 startBatch, endBatch 函数将 propagateChange(observable) 包裹到事务处理周期中。mobx 中的事务通过 globalState.inBatch 计数器标识:startBatch 阶段,globalState.inBatch 加 1;endBatch 阶段,globalState.inBatch 减 1;当 globalState.inBatch 为 0 时,表示单个事务周期结束。事务的意义就在于将并行的响应式数据变更视为一组,在一组变更完成之后,才执行相应的衍生。
  3. 在一个事务处理周期中,首先通过 propagateChange(observable) 间接将 derivation 加入到 globalState.pendingReactions 队列中。该过程中通过调用 derivation.onBecameStale 方法实现。对于 reaction 反应,在事务执行期间,直接将 reaction 添加到 globalState.pendingReactions 队列;对于 computedValue 计算属性,间接将观察 computedValue 变更的 reaction 添加到 globalState.pendingReactions 队列。
  4. endBatch 阶段,通过调用 runReactions 函数遍历 globalState.pendingReactions 队列,执行 reaction.runReaction 方法。每个 reaction.runReaction 方法内部的执行逻辑中,包含 observer 依赖和状态更新,以及执行用户端处理逻辑。reaction.runReaction 方法的处理细节将在后文予以分析。
  5. 事务的尾端,将遍历 globalState.pendingUnobservations 数组,并调用 observable.onBecomeUnobserved 方法。对于计算属性,额外调用 computedValue.suspend 方法。目的是当没有观察者监听这些响应式数据变更时,就无需将数据变更上报到全局环境。
class Atom implements IAtom {
  public reportChanged() {
    startBatch()
    propagateChanged(this)
    endBatch()
  }
}

function startBatch() {
  globalState.inBatch++
}

function endBatch() {
  if (--globalState.inBatch === 0) {
    runReactions()
    const list = globalState.pendingUnobservations
    for (let i = 0; i < list.length; i++) {
      const observable = list[i]
      observable.isPendingUnobservation = false
      if (observable.observers.size === 0) {
        if (observable.isBeingObserved) {
          observable.isBeingObserved = false
          observable.onBecomeUnobserved()
        }
        if (observable instanceof ComputedValue) {
          observable.suspend()
        }
      }
    }
    globalState.pendingUnobservations = []
  }
}

function propagateChanged(observable: IObservable) {
  if (observable.lowestObserverState === IDerivationState.STALE) return
  observable.lowestObserverState = IDerivationState.STALE

  observable.observers.forEach(d => {
    if (d.dependenciesState === IDerivationState.UP_TO_DATE) {
      d.onBecomeStale()
    }
    d.dependenciesState = IDerivationState.STALE
  })
}

function propagateMaybeChanged(observable: IObservable) {
  if (observable.lowestObserverState !== IDerivationState.UP_TO_DATE) return
  observable.lowestObserverState = IDerivationState.POSSIBLY_STALE

  observable.observers.forEach(d => {
    if (d.dependenciesState === IDerivationState.UP_TO_DATE) {
      d.dependenciesState = IDerivationState.POSSIBLY_STALE
      d.onBecomeStale()
    }
  })
}

class ComputedValue<T> implements IObservable, IComputedValue<T>, IDerivation {
  onBecomeStale() {
    propagateMaybeChanged(this)
  }
}

class Reaction implements IDerivation, IReactionPublic {
  onBecomeStale() {
    this.schedule()
  }

  schedule() {
    if (!this._isScheduled) {
      this._isScheduled = true
      globalState.pendingReactions.push(this)
      runReactions()
    }
  }
}

let reactionScheduler: (fn: () => void) => void = f => f()

export function runReactions() {
  if (globalState.inBatch > 0 || globalState.isRunningReactions) return
  reactionScheduler(runReactionsHelper)
}

function runReactionsHelper() {
  globalState.isRunningReactions = true
  const allReactions = globalState.pendingReactions
  let iterations = 0

  while (allReactions.length > 0) {
    let remainingReactions = allReactions.splice(0)
    for (let i = 0, l = remainingReactions.length; i < l; i++)
      remainingReactions[i].runReaction()
  }
  globalState.isRunningReactions = false
}

以上代码不但没有看见计算属性重新取值的实现,而且高度依赖于 observable 和 observer 的依赖关系和各自的状态标识。因此,我们会有以下两个问题:

  1. reaction.runReaction 方法到底是怎样实现的?reaction 和 computedValue 又怎样相互影响?
  2. mobx 怎样更新及维护 observable 和 observer 的依赖关系和状态标识?

3.2.1 runReaction

结合上图,reaction.runReaction 针对以下两种情况作出处理:

  1. 当响应式数据引起的反应内部没有计算属性时,重新执行反应的处理逻辑。
  2. 当响应式数据引起的反应内部有计算属性时,且计算属性观察的数据改变时,通过 computedValue 方法重新获取计算属性的值,再度开启事务(在 reaction.runReaction 方法执行过程中,使用 startBatch 函数开启)的意义在于等待计算属性的重新计算。其他情况下使用原有的值。

需要留神的是,当 reaction.runReaction 方法使用 startBatch 开启事务时,globalState.isRunningReactions 仍旧为真值,也就不会造成 globalState.pendingReactions 中未作处理的 reaction 被反复执行。

class Reaction implements IDerivation, IReactionPublic {
  runReaction() {
    if (!this.isDisposed) {
      startBatch()
      this._isScheduled = false
      if (shouldCompute(this)) {
        // 用户实际注册的监听函数经包装后将以 reaction.onInvalidate 形式呈现
        this.onInvalidate()
      }
      endBatch()
    }
  }
}

function shouldCompute(derivation: IDerivation): boolean {
  switch (derivation.dependenciesState) {
    case IDerivationState.UP_TO_DATE:
      return false
    case IDerivationState.NOT_TRACKING:
    case IDerivationState.STALE:
      return true
    case IDerivationState.POSSIBLY_STALE: {
      const prevUntracked = untrackedStart()
      const obs = derivation.observing,
        l = obs.length
      for (let i = 0; i < l; i++) {
        const obj = obs[i]
        if (isComputedValue(obj)) {
          obj.get()

          if ((derivation.dependenciesState as any) === IDerivationState.STALE) {
            untrackedEnd(prevUntracked)
            return true
          }
        }
      }
      changeDependenciesStateTo0(derivation)
      untrackedEnd(prevUntracked)
      return false
    }
  }
}

class ComputedValue<T> implements IObservable, IComputedValue<T>, IDerivation {
  public get(): T {
    if (this.keepAlive && this.firstGet) {
      this.firstGet = false
      autorun(() => this.get())
    }

    // 初始化获取绑定计算属性的依赖关系,或者在 action 中直接获取计算属性
    if (globalState.inBatch === 0 && this.observers.size === 0) {
      if (shouldCompute(this)) {
        this.warnAboutUntrackedRead()
        startBatch()
        this.value = this.computeValue(false)
        endBatch()
      }

    // reaction.runReaction 处理逻辑中,将进入第二个条件分支
    } else {
      reportObserved(this)
      if (shouldCompute(this)) if (this.trackAndCompute()) propagateChangeConfirmed(this)
    }
    const result = this.value!
    return result
  }

  computeValue(track: boolean) {
    globalState.computationDepth++
    let res: T | CaughtException
    if (track) {
      res = trackDerivedFunction(this, this.derivation, this.scope)
    } else {
      res = this.derivation.call(this.scope)
    }
    globalState.computationDepth--
    return res
  }

  private trackAndCompute(): boolean {
    const oldValue = this.value
    const wasSuspended = this.dependenciesState === IDerivationState.NOT_TRACKING
    const newValue = this.computeValue(true)

    const changed = wasSuspended || !this.equals(oldValue, newValue)
    if (changed) this.value = newValue
    return changed
  }
}

export function propagateChangeConfirmed(observable: IObservable) {
  if (observable.lowestObserverState === IDerivationState.STALE) return
  observable.lowestObserverState = IDerivationState.STALE

  observable.observers.forEach(d => {
    if (d.dependenciesState === IDerivationState.POSSIBLY_STALE)
      d.dependenciesState = IDerivationState.STALE
    else if (
      d.dependenciesState === IDerivationState.UP_TO_DATE
    )
      observable.lowestObserverState = IDerivationState.UP_TO_DATE
  })
}

对于 trackDerivedFunction 函数的处理逻辑,可参见下一小节。

3.2.2 依赖更新

我们先来看一下 mobx 为 observable, observer 提供的状态值:

  1. IDerivationState.NOT_TRACKING:值为 -1,作为 derivation 的初始状态。当衍生不再订阅响应式数据时,derivation.dependenciesState 值也将被置为 NOT_TRACKING。
  2. IDerivationState.UP_TO_DATE:值为 0,当响应式数据变更且衍生有执行时,derivation.dependenciesState 状态将被置为 UP_TO_DATE。
  3. IDerivationState.POSSIBLY_STALE:值为 1,计算属性变更时,订阅计算属性的衍生状态将置为 POSSIBLY_STALE。若在 shouldCompute 函数执行环节,当确认计算属性的值未作变更时,derivation.dependenciesState 状态将被重置为 UP_TO_DATE;若作变更,状态将置为 STALE。
  4. IDerivationState.STALE:值为 2,当衍生订阅的响应式数据或计算属性变更时,derivation.dependenciesState 状态将被置为 STALE,意味着衍生的逻辑需要重新启动。

状态标识更新流程为:

  1. 当初次添加 derivation 时,状态标识置为 NOT_TRACKING。
  2. 当响应式数据更新,监听这个响应式数据的衍生包含 reaction,则将该 reaction 的状态置为 STALE;包含 computedValue,则将该 computedValue 状态置为 STALE,并通过 computedValue.onBecameStale 方法将订阅这个计算属性的反应 reaction 的状态置为 POSSIBLY_STALE。
  3. 在事务 endBatch 阶段,在 reaction.runReaction 执行过程刷新 reaction 和 observable 的绑定关系,并将 reaction 的状态标识置为 NOT_TRACKING(当用户端处理逻辑执行过程中,reaction 订阅了新的 observable 时) 或 UP_TO_DATE。若 reaction 还订阅了计算属性,则调用计算属性 computedValue.get 方法,通过这个方法的执行,刷新 computedValue 和 observable 的关系,并将其状态标识置为 NOT_TRACKING 或 UP_TO_DATE。

第 2 步的执行逻辑参见上文给出的 propagateChanged, propagatedMaybeChanged 函数;第 1, 3 两步均通过 trackDerivedFunction 函数实现。

function trackDerivedFunction<T>(derivation: IDerivation, f: () => T, context: any) {
  changeDependenciesStateTo0(derivation)
  derivation.newObserving = new Array(derivation.observing.length + 100)
  derivation.unboundDepsCount = 0
  derivation.runId = ++globalState.runId
  const prevTracking = globalState.trackingDerivation
  globalState.trackingDerivation = derivation
  let result
  if (globalState.disableErrorBoundaries === true) {
    result = f.call(context)
  } else {
    try {
      result = f.call(context)
    } catch (e) {
      result = new CaughtException(e)
    }
  }
  globalState.trackingDerivation = prevTracking
  bindDependencies(derivation)
  return result
}

// 刷新 derivation 和 observable 的依赖关系,并将 derivation 的状态标识置为 UP_TO_DATE 或 NOT_TRACKING
function bindDependencies(derivation: IDerivation) {
  const prevObserving = derivation.observing
  const observing = (derivation.observing = derivation.newObserving!)
  let lowestNewObservingDerivationState = IDerivationState.UP_TO_DATE

  let i0 = 0,
    l = derivation.unboundDepsCount
  for (let i = 0; i < l; i++) {
    const dep = observing[i]
    if (dep.diffValue === 0) {
      dep.diffValue = 1
      if (i0 !== i) observing[i0] = dep
      i0++
    }

    if (((dep as any) as IDerivation).dependenciesState > lowestNewObservingDerivationState) {
      lowestNewObservingDerivationState = ((dep as any) as IDerivation).dependenciesState
    }
  }
  observing.length = i0

  derivation.newObserving = null

  l = prevObserving.length
  while (l--) {
    const dep = prevObserving[l]
    if (dep.diffValue === 0) {
      removeObserver(dep, derivation)
    }
    dep.diffValue = 0
  }

  while (i0--) {
    const dep = observing[i0]
    if (dep.diffValue === 1) {
      dep.diffValue = 0
      addObserver(dep, derivation)
    }
  }

  // 对于新添加的观察数据,将 derivation 添加 globalState.pendingReactions 中,在当前事务周期中处理
  if (lowestNewObservingDerivationState !== IDerivationState.UP_TO_DATE) {
    derivation.dependenciesState = lowestNewObservingDerivationState
    derivation.onBecomeStale()
  }
}

function changeDependenciesStateTo0(derivation: IDerivation) {
  if (derivation.dependenciesState === IDerivationState.UP_TO_DATE) return
  derivation.dependenciesState = IDerivationState.UP_TO_DATE

  const obs = derivation.observing
  let i = obs.length
  while (i--) obs[i].lowestObserverState = IDerivationState.UP_TO_DATE
}

trackDerivedFunction 函数在 mobx 中的实际调用过程为:

  1. Reaction 构造函数提供了 track 实例方法。该实例方法执行过程中,将调用 trackDerivedFunction 函数更新 derivation 的状态和依赖关系,同时执行传参 fn 用户端执行逻辑。实际在构造 Reaction 过程中,用户端执行逻辑将经由 reaction.track 封装后构成 reaction.onInvalidate 方法,该方法将在 reaction.runReaction 执行过程中得到调用。这样就解释了响应式数据变更时,既会处理用户端执行逻辑,如使视图重绘,又会促使 reaction 的状态值和依赖关系得到更新。
  2. ComputedValue 构造函数提供的 computeValue 实例方法,也会调用 trackDerivedFunction 函数。而 computedValue.get 实例方法将间接调用 computeValue 方法,从而使计算属性的状态得到更新。
class Reaction implements IDerivation, IReactionPublic {
  constructor(
    public name: string = "Reaction@" + getNextId(),
    private onInvalidate: () => void,
    private errorHandler?: (error: any, derivation: IDerivation) => void
  ) { }

  runReaction() {
    if (!this.isDisposed) {
      startBatch()
      this._isScheduled = false
      if (shouldCompute(this)) {
        // 用户实际注册的监听函数经包装后将以 reaction.onInvalidate 形式呈现
        this.onInvalidate()
      }
      endBatch()
    }
  }

  track(fn: () => void) {
    startBatch()
    this._isRunning = true
    const result = trackDerivedFunction(this, fn, undefined)
    this._isRunning = false
    this._isTrackPending = false
    if (this.isDisposed) {
      clearObserving(this)
    }
    endBatch()
  }
}

function autorun(
  view: (r: IReactionPublic) => any,
  opts: IAutorunOptions = EMPTY_OBJECT
): IReactionDisposer {
  const name: string = (opts && opts.name) || (view as any).name || "Autorun@" + getNextId()
  let reaction: Reaction

  reaction = new Reaction(
    name,
    function (this: Reaction) {
      this.track(reactionRunner)
    },
    opts.onError
  )

  function reactionRunner() {
    view(reaction)
  }

  // 初始化绑定 reaction 和 observable 的依赖关系,并调用用户端执行逻辑 view
  reaction.schedule()
  return reaction.getDisposer()
}

4. 接口层

接口层的主要目的是在响应式数据变更后自动调用用户端执行逻辑。

4.1 autoRun

autoRun 函数的处理流程为:

  1. 构建 Reaction 实例。
  2. 初始化调用 reaction.schedule 方法,初次调用用户端执行逻辑 ,绑定 observer 和 observable 的依赖关系。
  3. 使用 reaction.track 包装用户端执行逻辑,在响应式数据变更后,既负责更新 observer 和 observable 的依赖关系,又负责调用用户端执行逻辑。

用户端执行逻辑通常表现为视图变更,因此也被标识为 view 函数。

在调用 autoRun 时,可以通过选项 opts.delay 或 opts.scheduler 调度 view 的执行时机。完整代码如下:

function autorun(
  view: (r: IReactionPublic) => any,
  opts: IAutorunOptions = EMPTY_OBJECT
): IReactionDisposer {
  const name: string = (opts && opts.name) || (view as any).name || "Autorun@" + getNextId()
  const runSync = !opts.scheduler && !opts.delay
  let reaction: Reaction

  if (runSync) {
    reaction = new Reaction(
      name,
      function (this: Reaction) {
        this.track(reactionRunner)
      },
      opts.onError
    )
  } else {
    const scheduler = createSchedulerFromOptions(opts)
    let isScheduled = false

    reaction = new Reaction(
      name,
      () => {
        if (!isScheduled) {
          isScheduled = true
          scheduler(() => {
            isScheduled = false
            if (!reaction.isDisposed) reaction.track(reactionRunner)
          })
        }
      },
      opts.onError
    )
  }

  function reactionRunner() {
    view(reaction)
  }

  reaction.schedule()
  return reaction.getDisposer()
}

function createSchedulerFromOptions(opts: IReactionOptions) {
  return opts.scheduler
    ? opts.scheduler
    : opts.delay
      ? (f: Lambda) => setTimeout(f, opts.delay!)
      : run
}

4.2 reaction

reaction 函数的处理基本同 autoRun,其主要区别是,用户端执行逻辑表现为当首参计算函数返回的终值发生改变时,执行次参副作用逻辑。

function reaction<T>(
  expression: (r: IReactionPublic) => T,
  effect: (arg: T, r: IReactionPublic) => void,
  opts: IReactionOptions = EMPTY_OBJECT
): IReactionDisposer {
  const name = opts.name || "Reaction@" + getNextId()
  const effectAction = action(
    name,
    opts.onError ? wrapErrorHandler(opts.onError, effect) : effect
  )
  const runSync = !opts.scheduler && !opts.delay
  const scheduler = createSchedulerFromOptions(opts)

  let firstTime = true
  let isScheduled = false
  let value: T

  const equals = (opts as any).compareStructural
    ? comparer.structural
    : opts.equals || comparer.default

  const r = new Reaction(
    name,
    () => {
      if (firstTime || runSync) {
        reactionRunner()
      } else if (!isScheduled) {
        isScheduled = true
        scheduler!(reactionRunner)
      }
    },
    opts.onError
  )

  function reactionRunner() {
    isScheduled = false
    if (r.isDisposed) return
    let changed = false
    r.track(() => {
      const nextValue = expression(r)
      changed = firstTime || !equals(value, nextValue)
      value = nextValue
    })
    if (firstTime && opts.fireImmediately!) effectAction(value, r)
    if (!firstTime && (changed as boolean) === true) effectAction(value, r)
    if (firstTime) firstTime = false
  }

  r.schedule()
  return r.getDisposer()
}

4.3 when

when 函数的意义是在满足特定条件下,以动作 action 形式执行副作用函数,其实现内部调用了 autoRun 函数。when 函数分为同步版本和异步版本两个。并且,随着响应式数据的变更满足了特定的条件,在副作用函数执行之前,reactin 实例将被销毁,因此副作用函数将只执行一次。

function when(
  predicate: () => boolean,
  opts?: IWhenOptions
): Promise<void> & { cancel(): void }
function when(
  predicate: () => boolean,
  effect: Lambda,
  opts?: IWhenOptions
): IReactionDisposer
function when(predicate: any, arg1?: any, arg2?: any): any {
  if (arguments.length === 1 || (arg1 && typeof arg1 === "object"))
    return whenPromise(predicate, arg1)
  return _when(predicate, arg1, arg2 || {})
}

function _when(predicate: () => boolean, effect: Lambda, opts: IWhenOptions): IReactionDisposer {
  let timeoutHandle: any
  if (typeof opts.timeout === "number") {
    timeoutHandle = setTimeout(() => {
      if (!disposer[$mobx].isDisposed) {
        disposer()
        const error = new Error("WHEN_TIMEOUT")
        if (opts.onError) opts.onError(error)
        else throw error
      }
    }, opts.timeout)
  }

  opts.name = opts.name || "When@" + getNextId()
  const effectAction = createAction(opts.name + "-effect", effect as Function)
  const disposer = autorun(r => {
    if (predicate()) {
      r.dispose()
      if (timeoutHandle) clearTimeout(timeoutHandle)
      effectAction()
    }
  }, opts)
  return disposer
}

function whenPromise(
  predicate: () => boolean,
  opts?: IWhenOptions
): Promise<void> & { cancel(): void } {
  if (process.env.NODE_ENV !== "production" && opts && opts.onError)
    return fail(`the options 'onError' and 'promise' cannot be combined`)
  let cancel
  const res = new Promise((resolve, reject) => {
    let disposer = _when(predicate, resolve, { ...opts, onError: reject })
    cancel = () => {
      disposer()
      reject("WHEN_CANCELLED")
    }
  })
    ; (res as any).cancel = cancel
  return res as any
}

5 后记

mobx 是一个十分严谨的类库,这两篇文章点到的内容最多也不过十之六七。感兴趣的读者可以自行翻阅源码,想必能发现更多的矿藏,比如错误处理、信息追踪等。