RxSwift之管道——AnonymousObservableSink

615 阅读4分钟

在上一篇RxSwift核心逻辑简介中已经简单分析了RxSwift的核心逻辑。

但是其中有一个点,解析得不是很清楚。那就是可观察序列和订阅者之间到底是如何联系在一起的?那么这一篇就详细分析一下链接两者的管道——AnonymousObservableSink。

之前已经知道了,第二步订阅信号以后,代码会来到这里。

run函数中,会初始化AnonymousObservableSink,同时传递参数observercancelcancel是垃圾处理器,处理内存回收,不是本篇的重点,先忽略。我们重点关注observer

1、AnonymousObservableSink <—> observer

先看看observer的来源。其是执行subscribe函数内部初始化的匿名观察者AnonymousObserver的实例对象。

其内部保存了订阅信号时传入的Event闭包。

这个实例对象在AnonymousObservableSink初始化时作为初始化参数传入。我们再来看AnonymousObservableSink类的源码。

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

AnonymousObservableSink在初始化时调用的是父类的初始化方法,我们再找到其父类的源码。

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}

通过父类的初始化方法可以看出,observer对象最终被保存在Sink类的_observer变量中。

因此子类AnonymousObservableSink就拥有了观察者observer

2、AnonymousObservableSink <—> 可观察序列

AnonymousObservableSink被初始化后,会执行sink.run(self)函数,这里的self即为第一步创建可观察序列中,创建的可观察序列。

通过run函数,AnonymousObservableSink与可观察序列之间就建立起了联系。

3、observer.onNext()中的observer到底是什么?

我们知道,在第三步==发送信号==会执行observer.onNext()函数。这个observer会是我们创建的可观察序列吗?我们来分析一下源码。

AnonymousObservableSink对象执行run函数时

func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
}

会调用parent_subscribeHandler。在上一篇中已经分析了_subscribeHandler,就是第一步创建可观察序列时传入的闭包。而给闭包返回的参数是一个AnyObserver(self)。所以,observer.onNext()中的observer不是我们创建的可观察序列,而是AnyObserver

查看源码。

public struct AnyObserver<Element> : ObserverType {
    /// The type of elements in sequence that observer can observe.
    public typealias E = Element
    
    /// Anonymous event handler type.
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}

AnyObserver在初始化时,执行的是self.observer = observer.on这是什么意思呢?

前面已经分析了。AnyObserverinit时传入AnyObserver的是AnonymousObservableSink的实例对象。那么observer.on其实是AnonymousObservableSink.on,而通过上面的AnonymousObservableSink源码可以知道,on其实是一个函数。所以,AnyObserver中的observer是一个函数。

既然observer.onNext()中的observer不是可观察序列,而是AnyObserver对象,那么第三步发送信号到底是如何执行的呢?

4、observer.onNext()分析

点击onNext()查看源码。

其实调用的是ObserverTypeself.on(.next(element))函数。但是ObserverType是一个协议,只有函数声明,没有实现,所以,最后调用的是AnyObserver.on(.next(element))函数。

从上面AnyObserver的源码可以看出。on函数,最终调用的其实是self.observer(event),而在上一节的分析中,我们已经知道。self.observer其实保存的是AnonymousObservableSink中的on函数。

所以最后会回到AnonymousObservableSink中调用self.forwardOn(event)。而AnonymousObservableSink中并没有forwardOn函数。那么老规矩,当前类中没有的方法,那么就去找它的父类Sink

在父类Sink中找到forwardOn()函数,函数中执行了self._observer.on(event)。我们之前已经分析过了,self._observer其实就是订阅信号时创建的匿名观察者AnonymousObserver

所以,最后会调用AnonymousObserver.on。但是AnonymousObserver没有on()函数。所以查找其父类ObserverBase

class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType

    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<E>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}

分析发现,最后仍然会回到子类AnonymousObserver并且调用其onCore()函数。

onCore()函数中执行的是self._eventHandler(event)。在上一篇中已经分析了self._eventHandler就是订阅信号时传入的闭包。

所以,经过一系列的数据传递,observer.onNext(),最终会回调到订阅信号时传入的闭包中。以此完成数据的传递和流转。