02.RxSwift - 核心逻辑源码分析

495 阅读4分钟
Observable 的流程
// 1: 创建序列
_ = Observable<String>.create { (obserber) -> Disposable in
    // 3:发送信号
    obserber.onNext("Cooci -  框架班级")
    return Disposables.create()  // 这个销毁不影响我们这次的解读
    // 2: 订阅序列
    }.subscribe(onNext: { (text) in
        print("订阅到:\(text)")
    })
  • 创建序列 - Observable<Any>.create
  • 订阅序列 - ob.subscribe
  • 发送信号 - obserber.onNext() obserber.onCompleted() obserber.onError()
核心逻辑源码分析

几个主要类的继承结构,方便大家更好的理解:

1.Observable序列的创建
extension ObservableType {
    // MARK: create
    /**
     Creates an observable sequence from a specified subscribe method implementation.

     - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

     - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
     - returns: The observable sequence with the specified implementation for the `subscribe` method.
     */
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

  • create 方法的时候创建了并返回了一个内部对象ob: AnonymousObservable
  • AnonymousObservable 内部保存了_subscribeHandler闭包
  • AnonymousObservable 继承了Producer,获得了subscribe功能
2.Observable序列的订阅
extension ObservableType {
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
     }
  }
}
  • ObservableType 拓展实现了subscribe
  • 创建了一个内部观察者AnonymousObserver对象,它这里的初始化是闭包参数,保存了外界的 onNext, onError , onCompleted , onDisposed 的处理回调闭包的调用
  • 调用self.asObservable()返回一个原序列(AnonymousObservable)对象:ob
  • 最后 ob调用父类Producersubscribe()方法,并把这个内部观察observer者带了过去

进入Producersubscribe()方法:参数(observer)

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
  • 这里主要关心self.run的调用,其他无关订阅流程的先不管
  • Producer调用run最终会定位到子类AnonymousObservable.run

回到AnonymousObservable.run

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
  • 创建了中介者AnonymousObservableSink对象
  • 中介者对象调用run,并传入参数:self(AnonymousObservable本身),观察者observer

进入AnonymousObservableSink

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
  • AnonymousObservable别名Parent
  • 初始化调用了父类Sink的初始化:super.init,并保存观察者_observer
  • 实现了on, 就是next,error,completed的事件闭包
  • on内调用了forwardOn
  • run方法里的parent就是AnonymousObservable,这里就相当于AnonymousObservable._subscribeHandler,参数AnyObserver对象
  • AnyObserver通过传入self(AnonymousObservableSink本身)初始化发放信号对象

进入AnyObserver内:

public struct AnyObserver<Element> : ObserverType {
    ...
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
   ...
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
}
  • sinkon保存为observer(observer这里就是保存了一个函数,并不是之前的观察者),即:AnonymousObservableSink.on
  • on方法内调用self.observer,即调用了AnonymousObservableSink.on
  • 到了这里最后执行了AnonymousObservableSink.on内部的forwardOn

这里需要注意:AnonymousObservableSink.run内部通过AnonymousObservable._subscribeHandler传入一个消息发送者AnyObserver,这里AnyObserver即我们create闭包中的observer

3.Observable序列发送信号
//1.创建信号
    let ob = Observable<Any>.create { (observer) -> Disposable in
       //3.发送信号
        observer.onNext("RxSwift核心逻辑")
       //observer.onError("error的" as! Error)
       observer.onCompleted()
       return Disposables.create()
 }
  • 这里闭包中的observer本质就是AnyObserver
  • 当我们发送信息onNext, onCompleted, onError的直接就来到ObserverType的拓展
  • 继承关系请看前边的继承图
public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

  • 内部调用self.on,本质就是调用(类的继承关系请看继承图)
  1. AnyObserver.on -> AnonymousObservableSink.on -> AnonymousObservableSink.forwardOn -> 通过父类调用
func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
  1. Sink.forwardOn -> Sink._observer.on-> AnonymousObserver.on-> 通过父类调用
final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

3.ObserverBase.on->AnonymousObserver.onCore->AnonymousObserver._eventHandler

func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

4.最后就通过AnonymousObserver创建时候的闭包,把onNetxt,onError,onComplete回调出去了

总结关系图: