Rxswift (六)销毁者Dispose源码分析

1,842 阅读28分钟

Rxswift(一)函数响应式编程思想

RxSwift (二)序列核心逻辑分析

RxSwift (三)Observable的创建,订阅,销毁

RxSwift(四)高阶函数

RxSwift(五)(Rxswift对比swift,oc用法)

Rxswift (六)销毁者Dispose源码分析

RxSwift(七)Rxswift对比swift用法

RxSwift (十) 基础使用篇 1- 序列,订阅,销毁

RxSwift学习之十二 (基础使用篇 3- UI控件扩展) @TOC

Rxswift销毁者Dispose简介

  1. 先通过一张思维导图初步了解一下销毁者Dispose它拥有什么,做了一些什么事情:
    image
  2. 本编文章主要是围绕上面这张图来展开,重点分析Dispose()是怎么销毁序列的。
  3. 从上图我们可以看出销毁者后的第一个根节点是dispose和disposeBag.那他们分别是什么呢?答案将在下面讲解。

Rxswift销毁者类和重要函数介绍

1. DisposeBag

1.1 DisposeBag是什么

RxSwift和RxCocoa还有一个额外的工具来辅助处理ARC和内存管理:即DisposeBag。这是Observer对象的一个虚拟”包”,当它们的父对象被释放时,这个虚拟包会被丢弃。 当带有DisposeBag属性的对象调用deinit()时,虚拟包将被清空,且每一个一次性(disposable)Observer会自动取消订阅它所观察的内容。这允许ARC像通常一样回收内存。 如果没有DisposeBag,会有两种结果:或者Observer会产生一个retain cycle,被无限期的绑定到被观察对象上;或者意外地被释放,导致程序崩溃。 所以要成为一个ARC的良民,记得设置Observable对象时,将它们添加到DisposeBag中。这样,它们才能被很好地清理掉。

当一个Observable(被观察者)被观察订阅后,就会产生一个Disposable实例,通过这个实例,我们就能进行资源的释放了。 对于RxSwift中资源的释放,也就是解除绑定、释放空间,有两种方法,分别是显式释放以及隐式释放:

  • 显式释放 可以让我们在代码中直接调用释放方法进行资源的释放 如下面的实例:
let dispose = textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
dispose.dispose()
  • 隐式释放 隐式释放则通过DisposeBag来进行,它类似于Objective-C ARC中的自动释放池机制,当我们创建了某个实例后,会被添加到所在线程的自动释放池中,而自动释放池会在一个RunLoop周期后进行池子的释放与重建;DisposeBag对于RxSwift就像自动释放池一样,我们把资源添加到DisposeBag中,让资源随着DisposeBag一起释放。

如下实例:

let disposeBag = DisposeBag()
func binding() {
       textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
           .addDisposableTo(self.disposeBag)
}

上面代码中方法addDisposableTo会对DisposeBag进行弱引用,所以这个DisposeBag要被实例引用着,一般可作为实例的成员变量,当实例被销毁了,成员DisposeBag会跟着销毁,从而使得RxSwift在此实例上绑定的资源得到释放。

从上面的讲解我们大致明白了DisposeBag就像我们我们OC内存管理里的自动释放池。他充当了一个垃圾回收袋的角色,你只需把序列加入了disposeBag,disposeBag就会在合适的时候帮我们释放资源,那么它是怎么做到的呢?

1.2 DisposeBag的实现源码分析

1.2.1. 先看一下类图:

image

1.2.2. 具体分析源码流程

  1. 当我们调用disposed()方法的时候,会调用Dispose类的insert()方法,将销毁者dispose加入的_disposables数组中。 具体源码如下:
public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock()
    
    // state
    fileprivate var _disposables = [Disposable]()
    fileprivate var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }

    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        self._insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        //这里为了为了防止多线程下出现抢占资源问题,需要加锁控制同步访问
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {//判断如果调用过了_dispose()说明已经被释放过了,不需要再释放,保证对称性,则直接返回
            return disposable
        }
        //保存到数组中
        self._disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        // 1.获取到所有保存的销毁者
        let oldDisposables = self._dispose()

        // 2.遍历每个销毁者,掉用每一个销毁者的dispose()释放资源
        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        self._lock.lock(); defer { self._lock.unlock() }

        // 获取到所有保存的销毁者
        let disposables = self._disposables
        
        self._disposables.removeAll(keepingCapacity: false)
        self._isDisposed = true //这个变量用来记录是否垃圾袋数组被清空过
        
        return disposables
    }
    
    deinit {
        //当DisposeBag自身对象被销毁时,调用自己的dispose(),遍历销毁数组中所有保存的销毁者,
        self.dispose()
    }
}
  1. 上面的源码流程通过一个图来标识
    image
  2. 总结一下上面的DisposeBag处理流程:
  • 当我们调用序列的dispose()方法是,DisposeBag调用insert()方法将我们的需要销毁的序列保存起来存放在_disposables数组中。
  • 当我们的DisposeBag销毁时,如定义的局部变量出了作用域后,就会被销毁,首先会调用我们的deinit()方法 如上图4,在deinit()里面会执行自己的dispose()方法,然后变量之前保存的所有需要释放的_disposables数组,依次调用他们自己的dispose()方法。

2. fetchOr()函数

  1. fetchOr 函数的作用类似于标记,先来看一下fetchOr()函数的源码:
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}

源码很简单,但是作用不小。代码中this 是传入的AtomicInt值,其内部仅有一个value值。 fetchOr 先将 this.value copy一份,作为结果返回。而将 this.value 和 mask 做或 (|) 运算。并且将或运算的结果赋值给 this.value。

  1. 我们通过一个表来理解这个函数的执行结果:
this.value mask oldValue 或 运算后this.value 返回值
0 1 0 1 0
1 1 1 1 1
0 2 0 2 0
1 2 1 3 1

就是做了一次或运算,实际的10进制结果不变,只是改变了里面的二进制位,可以用来做标志位,只是C语言里面经常用的方法,即一个Int类型处理本身的值可以使用外,还可以通过按位与,或,来改变它的标志位,达到传递值的目的,这样每个位都可以取代一个bool类型,经常用作枚举。

运算符 二进制 十进制 说明
0000 0001 1
0000 0010 2
或运算 0000 0011 3
  1. 通过上面的分析,我得知 fetchOr ()函数的作用就是,可以确保每段代码只被执行一次,就相当于一个标志位,如果初始值为0 ,如果传入参数1,假设这段代码重复执行5次,只有第一次会从0变为1,后面四次调用都是为1,不会发送变化。

Dispose核心逻辑

Dispose 实例代码分析

  • 学过Rxswift的童鞋都知道dispose()调用后,会向我们oc里面的引用计数器一样,释放我们的资源。释放的时候我们还可以监听到被销毁的事件回调。那么有没有思考过Dispose是如何做到的呢?

要知道这个答案,我们只能通过源码来一步步分析:

  • 首先,我们来看一段实例代码:

实例1:

func limitObservable(){
        // 创建序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }
  1. 上面的代码执行结果如下:
    image
  2. 通过上面的结果我们知道,这个创建的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为什么呢?这个问题我们后面再通过分析源码Rx源码就知道了。
  3. 现在我们把上面代码的那行注释放开dispose.dispose() 这行代码去掉注释,然后重新运行,输出结果如下:
    image
  4. 通过上面的代码我们看到了,创建的序列销毁了,销毁回调也执行了。那为什么加上了dispose.dispose() 就可以了呢?
  5. 此外我们再来修改一下我们的代码:

实例2:

func limitObservable(){
        // 创建序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onCompleted()
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }

上面的实例2 的代码相对与实例1 就多了一行代码:observer.onCompleted() : 我们再来看一下输出结果:

image
这里我们可以看到我们多加了一行observer.onCompleted()代码后,就也打印了销毁回调,销毁释放了,这是什么逻辑呢? why?

  • 下面就让我们带着三个问题去探索一下Rxswift底层是如何实现的

Dispose 流程源码解析

再分析Dispose源码前,我们必须先深入理解序列的创建,订阅流程这个是基础,只有理解了这个,才能真正理解Dispose的原理。 这个其实在之前的博客已经分析过了,详情可以参考我之前的博客:序列核心逻辑

为了便于更好的理解,我在这里还再一次理一下具体的流程:

1. 序列创建,订阅流程

  • (1) 当我们执行代码let ob = Observable<Any>.create { (observer) -> Disposable in 这里面是一个闭包我们称为闭包A } 时,实际会来到Create.swift文件的第20行:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
  • (2) create()函数返回一个AnonymousObservable(subscribe)对象,并将我们的闭包A传入到了AnonymousObservable的构造函数里面,AnonymousObservable将这个闭包A保存到了let _subscribeHandler: SubscribeHandler这个变量中存起来了。_subscribeHandler这个变量保存了序列ob创建时 传入的闭包A (其中闭包A要求传入AnyObserver类型作为参数)
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    //这个变量保存了序列ob创建时 传入的闭包A (其中闭包A要求传入AnyObserver类型作为参数)
    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler //这个变量保存了序列ob创建时 传入的闭包A
    }
    ...下面代码先不看省略掉
}
  • (3)我们调用let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)" } 这行代码进行序列ob的订阅操作,这行代码,我们跟进源码可以查看到:在ObservableType+Extensions.swift文件的第39行:
 public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            ... 此处代码先不分析省略
            
            let observer = AnonymousObserver<Element> { 
             这个里面是一个尾随闭包的内容这里先不分析
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
  • (4)从上面subscribe()的源码可以看到,在函数中创建了一个AnonymousObserver的对象,然后直接就return Disposables.create()结束了。

  • (5)这里我们并没有发现订阅和我们的闭包A有任何关系,那关键就在self.asObservable().subscribe(observer)这行代码里面了,我来分析一下这行代码到底做了些什么。

  • (6)我们要理解(5)中的这行代码,就需要先理解一下类的集成关系: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType 详情如下图:

    image

  • (7)通过继承关系,我们可以顺着继承链往上找父类,我们可以找到是在Observable类中定义了这个asObservable()方法:

public class Observable<Element> : ObservableType {
    ...此处省略不关注的代码
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
    ...此处省略不关注的代码
}

  • (8)通过源码分析,我得知 asObservable()就是返回self ,而(3)的代码调用是的self.asObservable().subscribe(observer) 这行代码的self就是我们创建的序列ob, 所以self.asObservable()返回的就是ob我们最开始创建的可观察序列。self.asObservable().subscribe(observer) 中的observer就是我们在public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) 方法实现中创建的局部变量:let observer = AnonymousObserver<Element> { 这个里面是一个尾随闭包的内容这里先不分析 } 我们将这个局部变量传入了Observable的subscribe()方法。
  • (9)接着我们就要分享Observable的subscribe()方法做了些什么了。
  • (10)当我们调用实例2中的这行代码:let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") } 的时候,实际调用了 ObservableType协议的subscribe()方法,在这个方法里面我们创建了一个AnonymousObserver对象,并通过self.asObservable().subscribe(observer) 传入了ob.susbscribe(observer) (注意:这里的ob就是我们create()创建的AnonymousObservable对象,而observer就是subscribe时创建临时局部AnonymousObserver对象,这些上面已经分析过了)。
  • (11)然而通过上面的类图,我们可以看到在ob(AnonymousObservable)类中并没有一个subscribe()的方法,那么我们只能先找它的父类Producer.
  • (12)通过前面的类图分析,可以知道Producer继承Observerable可观察序列,遵循了ObservableType协议(这个协议定义一个subscribe()接口),所以我们Producer中必定会实现这个接口。我来看一下源码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            //下面这行代码是重点,调用了自己的run()方法,并传入了两个参数:
            //参数1:observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
            //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
  • (13)通过上面源码分析,我们得知Producer实现的subscribe()接口里面,调用了自己的run()方法,并在run()方法里面传入了observer:就是我们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象。那接下来我看一下run()做了一些什么事情:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
  • (14)从上面Producer中的run()方法,我们可以知道在这个方法并没有做任何事情,就一行rxAbstractMethod(),而这个rxAbstractMethod()只是一个抽象方法。那我们的子类AnonymousObservable中肯定覆写了run()方法。接下来我们再看一下AnonymousObservablerun()的源码:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        //创建了一个管子AnonymousObservableSink,并传给了管子两个参数:
         //参数1:observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
            //参数2:disposer:就SinkDisposer()对象后将销毁会再分析。
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
  • (15)在上面的run()源码中我们可以看到:在AnonymousObservablerun()方法中。首先,创建了一个AnonymousObservableSink对象sink,并将observer(也就是我们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象)传入;其次,调用了sink.run(self) 方法返回了subscription,然后直接饭后一个元组,也就是run()方法返回了一个元组:(sink: sink, subscription: subscription)。 但是我们的重点是在sink管子上面。AnonymousObservableSink是一个类似于manager角色,它保存了序列,订阅者,销毁者三个信息,还具备调度能力。我们序列和订阅者就是通这个管子来做桥梁,实现通讯。
  • (16)接下来我们分析AnonymousObservableSink管子做了一些什么呢?我们来看一下AnonymousObservableSink的源码:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    //这里给了一个别名:Parent就是AnonymousObservable序列
    typealias Parent = AnonymousObservable<Element>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
    //传入了observer:就是我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
        super.init(observer: observer, cancel: cancel)
    }

func on(_ event: Event<Element>) {
    #if DEBUG
        self._synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self._synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
        if load(self._isStopped) == 1 {//如果已经执行过.error, .completed,就不会继续执行self.forwardOn(event)代码,意思就是只有对象生命周期内执行过.complete,.error事件,就不会再执行forwardOn,除非重新激活改变条件值。
            return
        }
        self.forwardOn(event)
    case .error, .completed:
    //fetchOr()这个方法上面已经讲解过,作用就是控制确保只会执行一次
        if fetchOr(self._isStopped, 1) == 0 {//如果从没有执行过就执行一次,否则不执行。以确保下面代码在对象生命周期内,无论on()调用多少次,都只会执行一次。
            self.forwardOn(event)
            self.dispose()
        }
    }
}

//这是一个很重要的方法,
    func run(_ parent: Parent) -> Disposable {
    //这里传入parent就是AnonymousObservable序列,也就是我们最开始create()序列ob,_subscribeHandler就是我们创建序列时传入的闭包A(闭包A就相对一个函数,要求传入一个参数,这个参数就是AnyObserver(self))
        return parent._subscribeHandler(AnyObserver(self))
    }
}
  • (17)通过上面AnonymousObservableSink的源码,我们得知有一下几点结论:
    • AnonymousObservableSink.init初始化时传入了observer:就是我们self.asObservable().subscribe(observer) 传入的AnonymousObserver对象。
    • AnonymousObservableSink有一个on()方法,这个方法根据传入的参数event做了不同的处理,但都会至少调用一次self.forwardOn(event)方法。每次如果onNext事件都会调用一次forwardOn()。但是.error, .completed事件最多只会调用一次forwardOn()
    • AnonymousObservableSink的run()方法是核心方法,是它会回调我们最开始create()创建时传递的闭包A,并将我们调用ob.subscribe()订阅时,函数内部创建的AnonymousObserver对象通过我们AnonymousObservableSink对象sink,也就是AnyObserver(self)中的self包装成一个AnyObserver结构体之后,作为参数传入闭包A,这样就将我们的序列和订阅者建立了联系。
    • **特别注意:**很多人认为传入我们闭包A 的就是AnonymousObserver 实际上不是,传入闭包A的时一个AnyObserver结构体
    • 通过AnonymousObservableSink的run()方法我们成功把我们最开始的ob.subscibe()订阅时创建的闭包通过AnyObserver(self)作为参数传给了闭包A,当我们在闭包A里面调用这行代码时:observer.onNext("kongyulu")时,由于经过ob.subscribe()订阅之后,AnyObserver(self)就是我们的observer.了,而此时的observer是一个结构体,它拥有了我们的管子AnonymousObservableSink对象的on()方法。
    • 在实例1中:当我们发送observer.onNext("kongyulu")序列消息时,实际上会通过我们的管子AnonymousObservableSink.on()来调度,最终调度我们订阅时的闭包:onNext()闭包Blet dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }
    • 那么现在最大的疑问就是**:AnonymousObservableSink.on()时如何从observer.onNext("kongyulu")调度到我们闭包B?**
  • (18)要分析上面这个问题,我们需要先来分析一下结构体AnyObserver(self)做了什么:先看一下AnyObsevrer的源码
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    
    public typealias EventHandler = (Event<Element>) -> Void
    //这里定义了别名EventHandler就是一个传入事件的闭包
    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    
    public init(eventHandler: @escaping EventHandler) {
    //self.observer保存了AnonymousObservableSink对象
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    //初始化时要求传入一个ObserverType,而这个是(17)点分析中AnyObserver(self)代码中的self,实际上就是AnonymousObservableSink对象,
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    //这段代码直接保存了AnonymousObservableSink.on()方法
    //self.observer实际就是一个on()方法
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
    //这里调用on方法实际就是调用AnonymousObservableSink.on(event)方法
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver() -> AnyObserver<Element> {
        return self
    }
}
  • (19)通过上面AnyObserver源码分析,我们得知AnyObserver初始化时保存了我们管子AnonymousObservableSinkon()方法,并且自己有一个on方法,在他自己的on方法里面再去调用AnonymousObservableSink.on()方法。这样就只是包装了一层不让外界知道我们的AnonymousObservableSink类,为啥这样设计呢?这样设计有几点好处:

    • 起到完全封装效果,外界完全不需要知道我们的管子AnonymousObservableSink类,他们不关心我们AnonymousObservableSink类时如何实现的,使用者只需要用这个接口on()就行了,至于on()是如何实现的,通过谁实现并不需要关心。
    • 起到解耦的效果,AnyObserver 并没有拥有我们AnonymousObservableSink对象,它只是拥有了AnonymousObservableSink的on()接口,只需要AnonymousObservableSink实现这个on()接口该做的事情就可以了。至于AnonymousObservableSink内部怎么改(只要on()接口不改)的都不会影响到AnyObserver
  • (20)现在我们的重点就在on()方法上面:

    • 当我们实例1中执行:observer.onNext("kongyulu")这行代码时,实际就会调用:AnyObserver.onNext()方法。(由于我们AnyObserver继承了ObserverType协议,也就拥有了ObserverTypeonNext()方法,此处如果不清楚可以往上回看类继承关系)
  • (21)AnyObserver.onNext()调用的时候会调用自己的on()方法:

ObserverType的接口定义

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))//这里会调回到AnyObserver的on()方法,AnyObserver继承ObserverType,重写了on()接口
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
  • (22) AnyObserver.on() 方法会调用 AnonymousObservableSink.on()方法。
  • (23)AnonymousObservableSink.on(event)会调用 AnonymousObservableSink.forwardOn(event)
  • (24)而在AnonymousObservableSink中没有定义forwardOn()方法,我们找到它的父类Sink里面实现了forwardOn() 源码如下:
class Sink<Observer: ObserverType> : Disposable {
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//初始化保存了self._observer实际就是:我们`self.asObservable().subscribe(observer)` 传入的`AnonymousObserver`对象
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        // 这里实际调用了`AnonymousObserver.on()`方法。
        self._observer.on(event)
    }

   ... 此次代码省略,不需要关注
}

  • (25)从上面的源码我们可以看到:Sink.forwardOn()实际调用了AnonymousObserver.on(),说白了就是:我们最开始实例1的observer.onNext("kongyulu")这行代码执行时,ob.onNext() 先调用AnyObserver.on()AnyObserver.on()又会调用AnonymousObservableSink.on()AnonymousObservableSink.on()又会调用AnonymousObservableSink.forwardOn(),接着AnonymousObservableSink.forwardOn()又会调用AnonymousObservableSink父类的Sink.forwardOn(),最后由Sink.forwardOn()调用了AnonymousObserver.on()
  • (26)到这里我们思路基本清晰了,我们再回到AnonymousObserver.on()方法定义:
  1. 首先我们查看类定义如下,并没有找到由on()方法:
final class AnonymousObserver<Element>: ObserverBase<Element> {
//次处给尾随闭包取了一个别名
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
//这里保存了一个传入的尾随闭包:这个尾随闭包就是我们ob.subscribe()时创建let observer = AnonymousObserver<Element> { event in这里是个尾随闭包B} 这里传入_eventHandler保存的就是尾随闭包B
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
    //这里回调了我们的尾随闭包B
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
  1. 于是我们来找它的父类ObserverBase:
class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)//这里实际调用的是子类的onCore()
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}
  1. 我们通过分析父类源码得知ObserverBase.on()最终调用了AnonymousObserver.onCore(),而在AnonymousObserver.onCore()里回调了_eventHandler(event)闭包B,而闭包B就是我们最初ob.subscribe()序列订阅时创建AnonymousObserver的尾随闭包,这样这个尾随闭包最终调用了我们订阅的onNext()方法。这样就解释了:实例1中,执行observer.onNext("kongyulu")这行代码就会回调let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") } 从而打印了 “订阅到了:kongyulu”

具体AnonymousObserver {B}的尾随闭包B的代码如下:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
           
           ...此次无关代码先省略 
           //特别注意:AnonymousObserver<Element> { B}括号里面的尾随闭包我们称为B,最终会通过AnonymousObserver.onCore()函数调用闭包B
            let observer = AnonymousObserver<Element> { event in
                ...此次无关代码先省略 
                switch event {
                case .next(let value):
                    onNext?(value) //这里调用onNext(value)实际就是
                    
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
  • (27)通过(26)点分析我们应该弄明白了整个订阅的流程了, 简单总结就是
  1. 我们ob.create(闭包A)创建时将闭包A保存在AnonymousObservable 里变量_subscribeHandler
  2. 当我们调用ob.subscribe(闭包B)订阅序列时,会首先创建一个AnonymousObserver对象,并且会带一个尾随闭包C。然后通过self.asObservable().subscribe(AnonymousObserver) 经过一系列转化将AnyObserver传递给了闭包A
  3. 其中2中说一系列转化可以简单解释为:
  • self.asObservable().subscribe(AnonymousObserver) 实际就是ob.subscribe(AnonymousObserver)

  • ob.subscribe(AnonymousObserver)实际就是Producer.subscribe(AnonymousObserver)

  • Producer.subscribe(AnonymousObserver)会调用self.run(AnonymousObserver)

  • self.run(AnonymousObserver) 会创建一个AnonymousObservableSink管子对象sink,然后调用sink.run(AnonymousObservable)调用了管子的run()方法,并将ob传入了管子sink.

  • 而我们管子的sink.run(AnonymousObservable)方法里面调用了parent._subscribeHandler(AnyObserver(self))实际就是ob._subscribeHandler(AnyObserver(AnonymousObservableSink))也就是调用了闭包A

  • 而我们闭包A需要传入一个参数就是AnyObserver(AnonymousObservableSink),实际上AnyObserver只是一个结构体,它保存了AnonymousObservableSink.on()方法。

  • 当我们在闭包A里面调用observer.onNext("kongyulu")实际上就是AnyObserver.onNext("kongyulu"),而AnyObserver.onNext("kongyulu")会调用AnyObserver.on()

  • AnyObserver.on()接着又调用AnonymousObservableSink.on(event)这里event里面

  • AnonymousObservableSink类中AnonymousObservableSink.on(event) 接着又会去调用它自己的forwardOn(event)也就是AnonymousObservableSink.forwardOn(event)

  • AnonymousObservableSink.forwardOn(event) 实际上是调用它父类Sink.forwardOn(event)而在Sink父类初始化的时候已经保存了AnonymousObserver对象_observer。

  • Sink.forwardOn(event)会调用的是AnonymousObserver.on(event)

  • AnonymousObserver.on(event)实际会调用自己父类的ObserverBase.on(event)

  • ObserverBase.on(event) 实际又会调用子类的AnonymousObserver.onCore(event)

  • AnonymousObserver.onCore(event) 会调用self._eventHandler(event)而这里_eventHandler就是保存AnonymousObserver创建时传入的尾随闭包C这样就回调了闭包C

  • 闭包C中又根据event的事件不同,回调了闭包B,例如如event=.onNext事件,就会回调闭包B onNext{},也就是let dispose = ob.subscribe(onNext: { (anything) in print("订阅到了:\(anything)") }, onError: { (error) in print("订阅到了:\(error)") }, onCompleted: { print("完成了") }) { print("销毁回调") } 里面的这段代码:onNext: { (anything) in print("订阅到了:\(anything)") 从而就会打印:“订阅到了:kongyulu”

  • (28)

  • (29)

最后这里通过一个流程图来表达整个创建,订阅过程

2. 序列创建,订阅图解

序列创建流程图:孔雨露(QQ:282889543)

3. 序列订阅流程

3.1 序列销毁方式

上面讲解了序列的创建,订阅流程,在分析创建序列,订阅序列的源码时,我们已经隐隐约约的看到了我们开篇分析的dispose(),貌似在整个源码中各处都有着dispose的代码,那么序列到底是怎么销毁的呢?

为了解决这个疑问,我们下面将通过分析源码,来探索一下序列的销毁流程。

这里先看一张序列生命周期时序图:

序列生命周期时序图
通过这张时序图,结合上面的序列创建,订阅的流程分析,我可以先得出序列会被销毁的3种方式:

  • 方式一通过发送事件,让序列生命周期自动结束来释放序列资源。 一个序列只要发出了 error 或者 completed 事件,它的生命周期将结束,那么所有内部资源都会被释放,不需要我们手动释放。(这个结论在本篇博客讨论实例1实例2的时候已经验证了,只要发送了completed和error事件,就会调用onComplete并打印“销毁了”信息)

  • 方式二通过主动调用dispose()来释放。例如你需要提前释放序列资源或取消订阅的话,那么你可以对返回的可被清除的资源(Disposable) 调用 dispose 方法。

  • 方式三通过垃圾袋DisposeBag来回收资源,达到自动释放,这是官方推荐的方式。官方推荐使用清除包(DisposeBag)来管理订阅的生命周期,一般是把资源加入到一个全局的DisposeBag里面,它跟随着页面的生命周期,当页面销毁时DisposeBag也会随之销毁,同时DisposeBag里面的资源也会被一一释放。(这个结论在上面的DisposeBag分析中也证实了)

3.2 序列销毁实例分析

我们先来回顾一下本篇博客开始分析的实例1的代码 实例1:

func limitObservable(){
        // 创建序列
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("销毁释放了")} // dispose.dispose()
        }
        // 序列订阅
        let dispose = ob.subscribe(onNext: { (anything) in
            print("订阅到了:\(anything)")
        }, onError: { (error) in
            print("订阅到了:\(error)")
        }, onCompleted: {
            print("完成了")
        }) {
            print("销毁回调")
        }
        print("执行完毕")
        //dispose.dispose()
    }
  1. 上面的代码执行结果如下:
    image
  2. 通过上面的结果我们知道,这个创建的序列没有被销毁,即没有打印“销毁释放了”,也没有打印“销毁回调”。这是为什么呢?这个问题我们后面再通过分析源码Rx源码就知道了。
  3. 现在我们把上面代码的那行注释放开dispose.dispose() 这行代码去掉注释,然后重新运行,输出结果如下:
    image
3.3 序列销毁源码分析
  1. 通过上面实例1的代码,首先可以看到,在创建序列Observable<Any>.create()方法有一个尾随闭包,需要返回一个实现了Disposable协议的实例。而就是通过return Disposables.create { print("销毁释放了")} 这行代码返回的。由此我们确认Disposables.create { print("销毁释放了")}非常重要,我们先来发分析一下Disposables.create源码。
  2. 进入到Disposables.create()源码:我们想直接点击进去发现Disposables就是一个空结构体
public struct Disposables {
    private init() {}
}

看这个结构体连初始化方法都是私有的,说明它不能被继承,于是我们推测Disposables.create()一定通过扩展的方式实现的。所以我们在项目中搜索extension Disposables {关键字,可以找到如下:

image
这样我们找到第一个:AnonymousDisposable.swift文件进入找到第55行:

extension Disposables {

    /// Constructs a new disposable with the given action used for disposal.
    ///
    /// - parameter dispose: Disposal action which will be run upon calling `dispose`.
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)//这里dispose就是我们传入的尾随闭包
    }

}

通过上面源码,我们看到直接一行return AnonymousDisposable(disposeAction: dispose)就结束了,而dispose 就是我们实例1中 Disposables.create { print("销毁释放了")} // dispose.dispose() } 这行代码里面的尾随闭包: { print("销毁释放了")} 这里我们给他一个别名成为:闭包D

  1. 不用思考,接下来我们肯定要进入AnonymousDisposable类实现一探究竟:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    fileprivate init(_ disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    // Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }

    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
  1. 分析上面AnonymousDisposable类定义源码,我们可以得出以下结论:
  • 初始化的时候把外界传过来的闭包进行保存,传入进来的闭包我们就是我们第2点中分析的闭包D{ print("销毁释放了")}
  • 有一个dispose()方法,通过fetchOr(self._isDisposed, 1) == 0这行代码控制dispose()里面的内容只会被执行一次。(无论dispose()方法被执行多少次,if let action = self._disposeAction { self._disposeAction = nil action() } 这段代码最多会被执行一次)
  • dispose()方法中先把self._disposeAction赋值给临时变量action,然后置空self._disposeAction,再执行action()。这样操作的原因是如果_disposeAction闭包是一个耗时操作,也能够保证_disposeAction能够立即释放。
  1. AnonymousDisposable里面我们只看到了一些常规的保存等操作,结合我们最开始分析序列的创建流程经验(AnonymousDisposable就类似于AnonymousObservable),我们可以推断核心代码实现肯定在订阅这一块。

  2. 接下来,我们进入到observable.subscribe()方法来探究一些subscribe()的源码实现。

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
    //1.这里定义disposable局部变量
    let disposable: Disposable
    //2.创建了Disposables对象
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    }
    else {
        disposable = Disposables.create()
    }
    //3.创建了一个AnonymousObserver对象,有一个重要的尾随闭包
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
            else {
                Hooks.defaultErrorHandler(callStack, error)
            }
            disposable.dispose() //这里当收到error事件就会回收释放资源
        case .completed:
            onCompleted?()
            disposable.dispose() //这里当收到completed事件就会回收释放资源
        }
    }
    
    return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable//这里将我们创建的局部变量传给了self.asObservable().subscribe,也就是我们的Producer.subscribe
    )
}

分析上面subscribe()源码,结合开始的分析,我们可以得出以下结论:

  • subscribe()创建了一个Disposable对象,并保存了销毁回调闭包,当执行销毁时,会把消息回调出去。
  • 在收到错误或者完成事件时会执行disposable.dispose()释放资源。
  • return Disposables.create( self.asObservable().subscribe(observer), disposable ),这里返回的Disposable对象就是我们外面手动调用dispose.dispose()方法的dispose对象,或者说是加入到全局的DisposeBag的销毁者。
  1. 由6的分析,我们清楚知道最后一行代码return Disposables.create( self.asObservable().subscribe(observer), disposable )时关键点,我们接下进入:Disposables.create()源码:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
    return BinaryDisposable(disposable1, disposable2)//返回一个二元销毁者对象。
}

上面代码我们看到create()直接返回了一个BinaryDisposable二元销毁者类对象,并将disposable1disposable2传入给了BinaryDisposable

  • 这里的disposable1就是self.asObservable().subscribe(observer) 也就是Producer..subscribe(observer)返回的disposer
  • disposable2就是我们subscribe()中创建局部变量let disposable: Disposable
  1. 接着我们来分析BinaryDisposable类到底是什么:
private final class BinaryDisposable : DisposeBase, Cancelable {

    private let _isDisposed = AtomicInt(0)

    // state
    private var _disposable1: Disposable?
    private var _disposable2: Disposable?

    /// - returns: Was resource disposed.
    var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    init(_ disposable1: Disposable, _ disposable2: Disposable) {
        self._disposable1 = disposable1
        self._disposable2 = disposable2
        super.init()
    }

    func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }
}