RxSwift (二)序列核心逻辑分析

2,292 阅读15分钟

Rxswift(一)函数响应式编程思想

RxSwift (二)序列核心逻辑分析

RxSwift (三)Observable的创建,订阅,销毁

RxSwift(四)高阶函数

RxSwift(五)(Rxswift对比swift,oc用法)

Rxswift (六)销毁者Dispose源码分析

RxSwift(七)Rxswift对比swift用法

RxSwift (十) 基础使用篇 1- 序列,订阅,销毁

RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOC

RxSwift序列核心逻辑

上一篇博客:Rxswift学习之(一)函数响应式编程思想只是简单的分析了序列的核心逻辑。本篇博客主要针对上一篇做一下更加深入的探讨,如果有那些地方分析有误,还请留言:QQ:282889543,让我们彼此提高,彼此成就。

总的来说分析Rxswift的核心逻辑还是按照三部曲:创建序列,订阅序列,销毁序列。核心思想是万物皆序列。

1. 序列的创建

Observable可观察者序列

我们先来看下创建Observable所涉及的类的继承关系: 如下图:

针对上面的类图,简单分析下类的关系和设计思想: 首先分层实施的很彻底,每一层都只解决一件事情,一层层叠起来结构非常清晰: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

其次我们简单分解一下每个类都做了些什么:

  • ObservableConvertibleType:顾名思义即可转换为Observable 类型协议,方法只有一个asObservable,这有什么好处呢?
  1. 用户不需要关注其具体是哪个类型对象
  2. 让用户更多的关注其核心功能
  • ObservableType:也是个协议,继承了ObservableConvertibleType协议的asObservable,它提供抽象方法subscribe,即我们常说的订阅,只有外部订阅了该对象,才能真正实现对该对象进行观察。
  • Observable:真正的类,可以称之为元类,对于用户来说Observable 的功能是完整的,因为它已经具备了所有的用户所需要的功能,尽管有些方法并没有得到实现仍是抽象方法。 Producer: 它继承了Observable的所有方法,并实现subscribe 方法
  • AnonymousObservable:它继承了Producer的所有方法,并且增加了属性let _subscribeHandler: SubscribeHandler用来保存创建序列时传入的闭包,也就相对于拥有了调用这个序列的能力,此外它还实现run方法,这也是创建序列最核心关键的方法。在run()方法中它创建一个AnonymousObservableSink final private类的对象,而这个对象sink可以称之为管子,它类似于manager的角色,拥有序列和订阅,销毁能力。这里有两个疑惑:

问题1. AnonymousObservableSink为什么要定义成final private类,不能被继承,也不能被外部访问? 问题2. 创建的Observable是如何关联到订阅的?

这两个问题我们后面再分析。

最后,我们总结一下设计思想:

事实上用户所使用的 Observable ,都是 Producer 的子类和AnonymousObservable平行的子类,只不过用户不需要关心其具体实现罢了 每一个类似AnonymousObservable的类,还有一个与之相关的类AnonymousObservableSink,Sink即管道,所有这些组合起来才能让其真正运行起来,AnonymousObservableSink同时拥有序列,订阅功能,类似于我们项目中经常用的manager角色。 整个设计向上通过组合协议的方式描述其特性,向下通过子类化的方式屏蔽其实现细节,类似于工厂模式,这样的类也可以叫类簇。

序列创建的流程

通过上面类继承关系,其实我们不难理解序列的创建流程,它确实也是只有比较简单的几部,寥寥几行代码就搞定了,难点是上面抛出的几个问题:

下面我们将通过一个简单的Rxswift的实例来分析一下序列的创建,订阅,销毁直接的流程和关系。

实例1

    //1. 创建序列
   let ob = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("kyl发送了信号")
            obserber.onCompleted()
            return Disposables.create()
        }
    
        // 2:订阅信号
        let _ = ob.subscribe(onNext: { (text) in
            print("订阅到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }

上面实例1 的这段代码可以用酷C老师的一个图来清晰的表达:

从上面的代码和关系图,我们可能会产生这样一个疑惑:

问题3: 创建的ob序列,仅仅只是通过ob.subscribe()订阅了一下,为什么我们在ob创建时的尾随闭包(我们这里给个名字叫闭包A)里面调用了obserber.onNext("kyl发送了信号")这个代码,我们就可以订阅到let _ = ob.subscribe(onNext: { (text) in print("订阅到:\(text)") } 这里会打印:”订阅到:kyl发送了信号“。我们没有看见他们之间有任何关联,怎么ob发送消息,subcribe()的onNext闭包就可以触发呢,这是为什么呢?

我们可以这里可以简单推理下:ob.subscribe()这个订阅方法肯定做了一些事情,在某个地方调用了闭包A,才能实现这个功能。具体是怎么样实现的呢?下面我们将通过分析源码来解答这个疑惑。

从上面的代码我们可以知道,创建序列就一行代码:let ob = Observable<Any>.create { (obserber) -> 而这一行代码其实是做了好多事情的。

首先我们通过一个流程图来初略的了解一下序列创建流程:

创建序列的Rxswift原码很简单,从上图可以看出,直接一行代码return AnonymousObservable(subscribe)就结束了,这里我们们并没有找到我们需要的答案,甚至我们有点越来越晕感觉。

  • AnonymousObservable类源码
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

我们先做个深呼吸,放轻松,此路不通那我们来尝试分析其他方向,不能在一棵树上吊死。下面我们来分析一下订阅的流程。

2.订阅

回顾上面实例1中的订阅代码:let _ = ob.subscribe(onNext: { (text) in这行代码又做了些什么事情呢?下面我们通过源码来深入分析一下:

  • Rxswift订阅subscribe()的源码如下:
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代码不是我们要分析的重点,...表示忽略了此次的一段源码
            /*注意,此次定义了一个AnonymousObserver()对象,以参数的形式,
            构造方法里面传入了一个尾随闭包eventHandler,
            在这个闭包里面,当收到event的不同事件,
            会触发并调用,我们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包
            */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //调用订阅时传入的ob.subscribe(onNext:闭包
                case .error(let error):
                    if let onError = onError {
                        onError(error)//调用订阅时传入的ob.subscribe(onError:闭包
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()//调用订阅时传入的ob.subscribe(onCompleted:闭包
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )/*这里直接返回了Disposables对象,用来释放资源,
            在它的构造函数里面直接调用了self.asObservable().subscribe(observer),
            而asObservable()就是我们创建的序列ob,也就是ob.subscribe(),
            并传入了,在这段代码里面创建的局部变量let observer = AnonymousObserver<E>,*/
    }

通过上面源码我们可以知道:subscribe()这个方法,以参数的形式传入了onNext()闭包,onError()闭包,onComplete()闭包,在函数里面创建了一个AnonymousObserver对象observer,这个对象创建的时候传入了一个闭包,当收到不同event事件时,会分别调用我们subscribe()传入的onNext,onError,onComplete这三个闭包。最重要一点是return Disposables.create( self.asObservable().subscribe(observer), disposable )这句代码调用了我们真正的subscribe()函数,并以参数的形式传入了AnonymousObserver对象,self.asObservable()就是我们create()函数创建的序列ob, 而到此处我们可以清晰的看到,我们订阅时传入参数闭包和我们创建的ob建立了一个链条。

这里我们又有一个疑问:self.asObservable()为什么就是我们create()函数返回的ob呢?

要解答这个问题,我需要回顾一下上面分析的Observable类的继承关系:Observable -> ObservableType -> ObservableConvertibleType 即Observable继承ObservableType协议,ObservableType又继承ObservableConvertibleType协议,而我们的ObservableConvertibleType提供了抽象方法asObservable(),我们Observable类中实现了asObservable()这个方法,它直接返回self就它自己。

下面通过源码来证实:

///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {

    ...
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    ...
}

分析了Rxswift订阅subscribe()的源码感觉非常nice, 我们找到了我们ob 创建时传入的闭包和我们订阅时的闭包存在了一条链条关系,也就是只要ob发送了消息,那我们的订阅者一定可以按照这个链条收到消息。但是我们还是不知道到底是怎么调用的,怎么触发的。

而且我们注意到self.asObservable().subscribe(observer)也就是AnonymousObservable调用了subscribe()方法,但是在AnonymousObservable类中我们并没有找到subscribe()的定义,所以我们需要来看他的父类Producer

  • Producer的源码如下:
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            /*重点在这里了,这里调用了run()方法,一切疑惑都清晰了,我们知道了run()调用时传入了observer,并且创建了sink管子,而这个管子具备了序列的功能,可以调用on()方法。
            */
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

果然不出我们所料,在Producer中我们找到了subscribe()的方法定义,到此我们可以总结出很清晰的几条线索了

  • (1)通过前面的类继承关系可以知道是Producer实现了ObservableType协议的subscribe()方法。在这个方法里面调用了self.run(observer, cancel: disposer)
  • (2) self.run()实际上就是AnonymousObservable.run(), 这这个方法里面做了三件事情:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
//1.创建了一个sink管道对象,并将observer也就create()创建
//序列时传入的闭包传给了sink
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        //2. sink调用自己的run()方法,并把AnonymousObservable作为参数传入。
        let subscription = sink.run(self)
        //返回一个元组,包含sink管道信息。
        return (sink: sink, subscription: subscription)
    }
  • (3)AnonymousObservableSink类中run()方法中调用parent._subscribeHandler(AnyObserver(self)) 其中parent就是我们(2)中sink.run(self)传入的self,也就是AnonymousObservable对象;并且我们前面已经知道_subscribeHandler就是创建序列时保存的那个通过create()函数参数传入的 闭包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("kyl发送了信号") obserber.onCompleted() return Disposables.create() }。 现在已经很清晰了parent._subscribeHandler(AnyObserver(self)) 执行闭包,这行代码就会调用obserber.onNext("kyl发送了信号")这个行代码。

  • 现在我们可以通过一个流程图来总结我们代码执行的流程:

上面的订阅序列流程分析:我们弄明白了从订阅序列到调用create()函数时传入的参数闭包调用的逻辑,但是这个闭包发送onNext()信号后,怎么到订阅消息的onNext()闭包我们还不是很清晰。因此我们需要分析AnonymousObserver

我们先来看下AnonymousObserver

  • AnonymousObserver源码定义如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    /*构造函数,保存了EventHandler尾随闭包*/
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    //覆写了onCore方法,调用了EventHandler闭包
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

AnonymousObserver源码中我们并没有找到onNext()方法,那我们只能沿着它的继承链往上查找,这里需要了解一下类的继承关系:

  • AnonymousObserver的继承关系:

通过分析类的继承关系,我们得知:这样一个关系链:

AnonymousObserver对象的on()方法会调用onCore()方法,ObserverType里面有onNext,onError,onComplete方法。但是on()是如何调用的,何时调用的呢?

要解决这个疑问,我们需要再次回到我们创建序列的代码:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

创建序列的create()方法传入了一个subscribe闭包,并返回了AnonymousObservable对象。其中subscribe闭包就是我们序列创建时参数形式传入 闭包。并且AnonymousObservable初始化时将这个闭包保存起来了self._subscribeHandler = subscribeHandler AnonymousObservable 有一个run()方法,run方法里面创建了一个AnonymousObservableSink对象sink,具体源码如下:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

分析了这么久,绕了一圈,终于发现关键就在AnonymousObservableSink管子这个对象里面了。sink这是个神奇的管子。它就保存了序列,也保存了订阅,还保存了用于销毁的disposed 也就是同时拥有了创建序列,订阅序列,销毁序列功能。

我们来分析下AnonymousObservableSink的源码:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    //这里的Parent就是我们上面分析的AnonymousObservable,非常重要
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

// 构造方法,传入了observer序列,和Cancelable
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

//这里实现 了ObserverType协议的on()方法
    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            //调用了父类的发布,self.forwardOn()会调用自己的on()方法
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
    /*调用了_subscribeHandler闭包,这个闭包就是我们之前创建序列时传入闭包。
    parent就是传入进来的序列,这里序列的闭包里传入了self并且强转为AnyObserver
    这里将self传给了闭包_subscribeHandler,这样_subscribeHandler也就具备了subcribe的能力。
    */
        return parent._subscribeHandler(AnyObserver(self))
    }
}

其中Sink类的源码如下:

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        //这里调用了传入observer.on()方法,
        self._observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}

从源码分析我们得知:

  • 我们的sink保存了我们的序列,当我们调用ob.onNext()发送信号时,由于我们的sink已经持有了ob, 这样sink会调用on()方法,在on()方法里面会调用self.forwardOn(event),而在fowardOn()里面会调用self._observer.on(event)。这样我的疑问就解决了,答案就是sink调用了on()方法。

  • 这里我们再来总结一下总的流程:

  1. 创建序列时create()返回了一个ob, 这个ob就是序列,创建的时候传递了一个闭包A。在闭包A中调用了ob.onNext()发送了信号。
  2. 订阅序列时调用ob.subscribe()方法,这个方法会创建一个AnonymousObserver对象,并调用了self.asObservable().subscribe(observer)
  3. self.asObservable()实际就是我们的ob, 也就是ob调用了subscribe().而AnonymousObserver中没有找到subscribe()。
  4. 我们在AnonymousObserver的父类中找到了subscribe(),发现subscribe()调用了AnonymousObserver的run()方法。
  5. 在AnonymousObserver的run()方法中,创建了一个管子sink,并调用了sink.run(self),sink是AnonymousObservableSink的对象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))调用了创建序列时保存的闭包A (parent就是AnonymousObserver),这样就解释了订阅时,回调了A闭包的原因。
  6. 至于怎么调用onNext()方法也是通过sink来实现的。
  7. sink已经持有了ob ,当我们在A闭包里面调用ob.onNext()发送信号时,实际会通过sink.on()来调用。首先sink.on()会调用forwardOn().
  8. 在forwardOn()中会调用self._observer.on(event)。
  9. _observer.on()会调用_observer.onCore()
  10. _observer.onCore(event)会根据event的类型判断是调用onNext(),onError(),onComplete()中间一个,由于我们传递的是onNext事件,所以会调用onNext() ,而这个_observer.onNext()会调用我们订阅时传入闭包subscribe(onNext:).
  11. 为什么回调的原因是:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            
            ... 上面代码不是我们要分析的重点,...表示忽略了此次的一段源码
            /*注意,此次定义了一个AnonymousObserver()对象,以参数的形式,
            构造方法里面传入了一个尾随闭包eventHandler,
            在这个闭包里面,当收到event的不同事件,
            会触发并调用,我们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包
            */
            let observer = AnonymousObserver<E> { event in
          
                ...
                
                switch event {
                case .next(let value):
                    onNext?(value) //调用订阅时传入的

这里调用ob.subscribe()的时候,我们创建了AnonymousObserver和我们subscribe()传入的onNext()闭包做了一个绑定,当AnonymousObserver.onNext()调用的时候必定会回调subscribe()传入的onNext()闭包。而10中的_observer对象指的就是let observer = AnonymousObserver

  • 还是通过这张图来解释最简洁:

3. 销毁

RxSwift给我们的展示的设计思维

iOS 常用设计模式