RxSwift源码解析一

1,816 阅读5分钟

RxSwift源码解析一

一、介绍

一个帮助我们简化异步编程的Swift框架。

二、核心

  • Observable:产生事件
  • Observer:响应事件
  • Operator:创建变化组合事件
  • Disposable:管理绑定(订阅)的生命周期
  • Schedulers:线程队列调配

三、ObservableObserver之间的关系

  • 例子
let _ = Observable<Int>.create { (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create()
}.subscribe(onNext: { (num) in
    print("receive num \(num)")
}, onError: { (error) in
    print("error: \(error.localizedDescription)")
}, onCompleted: {
    print("receive complete")
}) 

如上代码出现两个重要的方法createsubscribe。顾名思义,create方法是创建一个Observable对象,而subscribe方法是创建一个订阅事件。我们先关注下create方法如何创建一个Observable对象。

//Create.swift
extension ObservableType {
    
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
    
}

首先看它传入的参数为一个闭包:AnyObserver<Element> -> Disposable,然后返回的是一个Observable<Element>对象。对比我们的例子,我们可以确定Element为我们指定的Int,即泛型Element表示数据源类型。

上面返回的是一个AnonymousObservable对象,并将闭包作为参数传入。

//Create.swift
final private class AnonymousObservable<Element> : Producer<Element> {
    
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
    let _subscribeHandler: SubscribeHandler
    
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
    
}

AnonymousObservable将传入的闭包赋值给变量_subscribeHandler。至此创建完了一个Observable对象。然后执行其subscribe方法:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
    let disposable: Disposable
    
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    } else {
        disposable = Disposables.create()
    }
    
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer), disposable)
}

这一段代码比较长,我们先从参数下手,可以看到参数中包括onNext(产生下一个事件)、onError(产生错误)、onCompleted(产生完成)和onDisposed四个不同的闭包。我们先暂时不管Disposed部分内容,直接看到下面相关代码:

let observer = AnonymousObserver<Element> { event in
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
    
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}

上面代码创建了一个AnonymousObserver对象,并将参数的闭包事件与自身产生的event事件关联在一起。

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
}

AnonymousObserver对象将上面关联的一个事件转换闭包作为参数存储到变量_eventHandler中。其实可以简单地理解AnonymousObserver对象将上面subscribe方法中的参数闭包存储起来了。

再回到subscribe方法,看到最后一句代码:

return Disposables.create(self.asObservable().subscribe(observer), disposable)

我们关注到这里self.asObservable().subscribe(observer),首先调用了asObservable()方法:

//ObservableConvertibleType.swift
public protocol ObservableConvertibleType {
    
    associatedtype Element
    
    func asObservable() -> Observable<Element>
}

//Observable.swift
public class Observable<Element> : ObservableType {
    
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<Element> {
        return self
    }
    
}

如上所示,我们可以看到asObservable()方法返回的自己本身Observable。但我们看到这里对应的subscribe方法为"抽象方法",上面我们创建的是AnonymousObservable对象,在它的父类Producer中实现了:

class Producer<Element> : Observable<Element> {
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        } else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
    }
    
    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
    
}

同样我们先暂时不管Scheduler相关内容,这里调用了self.run()方法,但它本身并未实现该方法,同样我们在AnonymousObservale中可以找到:

final private class AnonymousObservable<Element> : Producer<Element> {
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Element == Observer.Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
    
}

run方法中创建了一个AnonymousObservableSink方法,然后调用了它的run方法。

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    typealias Element = Observer.Element
    typealias Parent = AnonymousObservable<Element>
    
    private let _isStopped = AtomicInt(0)
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
    
}

我们先看到其run方法,可以看到它执行了parent._subscribeHandler(AnyObserver(self)),这一句很关键,这里的parent其实指的是我们在调用create方法时,创建的AnonymousObservable对象。因此,这里的_subscribeHandler就是我们create方法传递的参数闭包。我们可以看到这里创建了一个AnyObserver对象传入到闭包中。

回到例子中的闭包内容:

{ (observer) -> Disposable in
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create()
}

这里调用了onNext方法产生元素1

public protocol ObserverType {
    associatedtype Element
    
    func on(_ event: Event<Element>)
}

extension ObserverType {
    
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
    
}

这里调用了on方法传递元素,而我们上面知道这里的ObserverTypeAnyObserver对象:

public struct AnyObserver<Element> : ObserverType {
    
    public typealias EventHandler = (Event<Element>) -> Void
    
    private let observer: EventHandler
    
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
  
}

接着调用了self.observer(event)将事件传递下去,而这里的observer是在创建parent._subscribeHandler(AnyObserver(self))时传入的。即self.observer = observer.on => AnonymousObservableSink.on

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    
}

因此,事件会传递到AnonymousObservableSink中,并通过fowardOn方法继续传递事件.


class Sink<Observer: ObserverType>: Disposable {
    
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    
    init(observer: Observer, cancel: Cancelable) {
        self._observer = observer
        self._cancel = cancel
    }
    
    final func forwardOn(_ event: Event<Observer.Element>) {
        if isFlagSet(self._diposed, 1) {
            return
        }
        self._observer.on(event)
    }
   
}

这里调用了self._observer.on(event)方法传递事件,而这里的_observer对象就是我们在调用subscribe方法时,传递进来的AnonymousObserver对象。而AnonymousObserver本身没有实现on方法,而是在父类ObserverBase中实现了:

class ObserverBase<Element> : Disposable, ObserverType {
    
    private let _isStopped = AtomicInt(0)
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
    
}

最后调用了onCore方法传递事件:

final class AnonymousObserver<Element>: ObserverBase<Element> {
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler: EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }
    
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
}

而这里的_eventHandler即是我们调用subscribe方法时,创建的闭包(将外部的Event和内部的Event关联)

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted:(() -> Void)? = nil, onDisposed:(() -> Void)? = nil) -> Disposable {
    ....
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
        
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(self.asObservable().subscribe(observer), disposable)
}

因此,我们会在外部接收到receive num 1的事件消息。

四、Observable与Observer的执行过程