RxSwift高阶函数之combineLatest

2,574 阅读3分钟

combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经发出过元素)。

示例:

let first = PublishSubject<String>()
let second = PublishSubject<String>()
        
Observable.combineLatest(first, second) {$0 + $1}
    .subscribe(onNext: { print($0)})
    .disposed(by: disposeBag)

first.onNext("0")       
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
first.onNext("5")
打印结果:
1A
2A
2B
2C
2D
3D
4D
5D
结果分析:
combineLatest函数需要两个序列都有信号,
并且将两个序列最近的的信号通过组合方法组合成一个信号,再发送给订阅者。

combineLatest 源码分析

查看combineLatest源码。

发现这只有一个协议函数的声明。老规矩,根据注释查找到函数的真正实现。因为示例中使用的是两个序列的组合,所以应该查看CombineLatest+arity.swift文件。

找到 combineLatest 函数。

会发现 combineLatest 函数是初始化了 CombineLatest2 对象。
查看 CombineLatest2 对象的初始化方法,其中保存了作为参数传递过来的两个序列和组合方法。

因为 CombineLatest2 对象继承自 Producer。所以按照之前RxSwift核心逻辑简介的分析,最终会执行 run 函数。

run 函数中,又初始化了 CombineLatestSink2_ 对象。

并且在初始化方法中调用了父类的初始化方法,同时给 arity 参数传值为 2,表示有 2 个可观察序列。

CombineLatest2.run 函数中初始化CombineLatestSink2_ 成功后,调用了 CombineLatestSink2_run 函数。

run 函数中,初始化了两个组合观察者 CombineLatestObserver。同时让两个源序列 subscribe 两个观察者。

参照RxSwift核心逻辑简介中的分析,将CombineLatestObserver 类比为 AnyObserver 对象。 最终会调用 CombineLatestObserveron 函数。

最后会调用 self._parent.next(self._index) 。 就会调用到 CombineLatestSink2_ 对象的 next 函数。但是通过源码可以看到,CombineLatestSink2_ 对象并没有 next 函数,所以我们查找其父类 CombineLatestSink。 在父类中找到 next 函数。

func next(_ index: Int) {
    if !self._hasValue[index] {
        self._hasValue[index] = true
        self._numberOfValues += 1
    }

    if self._numberOfValues == self._arity {
        do {
            let result = try self.getResult()
            self.forwardOn(.next(result))
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
        }
    }
    else {
        var allOthersDone = true

        for i in 0 ..< self._arity {
            if i != index && !self._isDone[i] {
                allOthersDone = false
                break
            }
        }
            
        if allOthersDone {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

这其中,就蕴含着 combineLatest 函数的关键。

  • self._arity 的值为2
  • self._hasValue 是在初始化时创建的一个2个元素均为false的数组。
  1. 当第0个序列发送第1个信号时。self._hasValue[0] 为false。self._hasValue[0] 赋值为true,同时 self._numberOfValues 的值加1。当前 self._numberOfValues值为1。判断 self._numberOfValues == self._arity 值为false,所以不会调用 self.forwardOn发送组合信号。
  2. 当第0个序列发送第2个信号时。 self._hasValue[0] 为true。self._numberOfValues 的值不会加1。当前 self._numberOfValues值为1。判断 self._numberOfValues == self._arity 值为false,所以仍然不会调用 self.forwardOn发送组合信号。
  3. 当发送第1个序列的第1个信号时。self._hasValue[1] 为false。self._hasValue[1] 赋值为true,同时 self._numberOfValues 的值加1。当前 self._numberOfValues值为2。判断 self._numberOfValues == self._arity 值为true,所以会调用 getResult() 函数,并且得到组合后的信号作为 self.forwardOn函数的参数发送出去。 所以会发送第一个组合信号。
  4. 当发送第0个序列的第3个信号时。self._hasValue[0] 为true。self._numberOfValues 的值不会加1。当前 self._numberOfValues值为2。判断 self._numberOfValues == self._arity 值为true,所以会调用 getResult() 函数,并且得到组合后的信号作为 self.forwardOn函数的参数发送出去。 所以会发送第一个组合信号。

以此类推。直到所有序列发送完所有的信号。

以上分析则是 combineLatest 函数的底层原理。若有不足之处,请评论指正。

最后,因为文字叙述可能不是很直观,所以附上图解,小伙伴们可以参照图解更直观的理解 combineLatest函数。