RxSwift 源代码套路分解:操作符 Operator 部分,自定制一个试试

1,069 阅读10分钟

RxSwift 三大支柱:Observable 可观察对象,操作符 Operator 和线程调度 Schedulers

本文手把手,看下 RxSwift 是怎么实现操作符 Operator 的

实现套路三步走:

例子,是 sample 采样操作符

我们这样使用 sample


// firstSubject 是输入事件流
// secondSubject 是一个控制事件流

        let firstSubject = PublishSubject<String>()
        let secondSubject = PublishSubject<String>()
       
        
        firstSubject.sample(secondSubject).subscribe(onNext: { (content) in
            print(content)
        }).disposed(by: rx.disposeBag)
        
        // 输入事件流,有了事件
        firstSubject.onNext("1")
        // 控制事件流,去触发输入事件流的事件
        secondSubject.onNext("A")

第一步,从 Observable 调用的语法糖,到 Operator 相应的做事情的类

下面的代码表明,调用 .sample , 是一层封装,方便组合与调用

采样实际的实现逻辑,全部在创建的类 Sample

firstSubject.sample(secondSubject), 输入两个 observable,输出一个新的 observable .

extension ObservableType {
    public func sample<Source: ObservableType>(_ sampler: Source)
        -> Observable<Element> {
            return Sample(source: self.asObservable(), sampler: sampler.asObservable())
    }
}

第 2 步,从 Operator 相应的实现类,到实际处理订阅的若干 Sink 类

Sample 类,是事件生产者类 Producer 的子类

前半段,很简单,弄两属性,

firstSubject.sample(secondSubject), 就做了一个 Sample 的初始化,把 firstSubjectsecondSubject , 交给这两属性持有

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element

这方法比较核心,里面完成了 firstSubject.sample(secondSubject) 的两个事件输入流的管理

起作用的是,操作符对应的若干 Sink 类

final private class Sample<Element, SampleType>: Producer<Element> {
    fileprivate let _source: Observable<Element>
    fileprivate let _sampler: Observable<SampleType>

    init(source: Observable<Element>, sampler: Observable<SampleType>) {
        self._source = source
        self._sampler = sampler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

第 3 步,若干 Sink 类,完成对输入事件流的逻辑处理

* Sample 类的 Sink 一号,final private class SampleSequenceSink<Observer: ObserverType, SampleType> : Sink<Observer>

上一步的 let subscription = sink.run()run() 的部分

func run() -> Disposable {
        self._sourceSubscription.setDisposable(self._parent._source.subscribe(self))
        let samplerSubscription = self._parent._sampler.subscribe(SamplerSink(parent: self))
        return Disposables.create(_sourceSubscription, samplerSubscription)
    }
里面做的事情,建立订阅和内存管理
  • self._parent._source.subscribe(self) ,建立订阅,

self._parent._source 是 , firstSubject.sample(secondSubject) 中的 firstSubject

  • self._parent._sampler.subscribe(SamplerSink(parent: self), 继续建立订阅

self._parent._sampler 是, firstSubject.sample(secondSubject) 中的 secondSubject

  • 剩下的 self._sourceSubscription.setDisposable

return Disposables.create(_sourceSubscription, samplerSubscription), 是内存管理方面

管理 subscribe 订阅到的对象 ObservableType

* Sample 类的 Sink 2 号,final private class SamplerSink<Observer: ObserverType, SampleType>
  • SamplerSink 主要涉及 firstSubject.sample(secondSubject) 中的 secondSubject 的事件处理逻辑,采样时机控制

  • SampleSequenceSink 主要涉及 firstSubject.sample(secondSubject) 中的 firstSubject 的事件处理逻辑,输入事件状态管理

操作符 .sample -> 对应的类 Sample -> 若干 Sink 类, 完成事件管道的建设

套路三步,有些弯弯绕绕


sink, 事件的下沉,套路三步的底部

关键点一:Sink 类是什么?

粗点讲:

Sink 是从 Observable , 事件流 event sequence 中取出事件,取出事件含有的值的一种方式

Observable 里面的事件 Event, 不能够直接取出来,必须通过订阅,才能读取

操作符 Operator 处理, 可观察对象 Observable

Observable 起作用的方式,是 subscribe 订阅出事件 Event

Observable 事件流,就是传递事件 Event 的

Event 是一个枚举,有三种状态

  • 有事情, .next(value)

  • 没事情,分两种情况,正常结束 gg , .completed

和出错凉凉 .error(error)

public enum Event<Element> {
    /// 生产出下一个事件
    case next(Element)

    ///  事件流结束,失败
    case error(Swift.Error)

    /// 事件流完成,成功
    case completed
}

firstSubject.sample(secondSubject), 从输入两枚,到输出一个

要做的是,掏出输入两个 Observable 的事件,处理下,输出需要的 Observable

已经被弃用的 Variable, 掏出 .next(value) 中的 value , 比较简单, 事情想来就来,不符合响应式规范

Observable 的 Event 事件取出,只能通过订阅的方式 subscribe

简单的理解, Sink 类是一种取输入流事件的方式

下面是用例,继续 sample

* SampleSequenceSink 里面的订阅逻辑处理

self._parent._source.subscribe(self)

firstSubject.sample(secondSubject) 中的 firstSubject, 输入事件流,被采样的对象,

因为 firstSubject 订阅了 self ,即 SampleSequenceSink 类的对象

func _synchronized_on(_ event: Event<Element>) 中的 Event,就是 firstSubject 的事件

  • firstSubject 流入的 .next 正常事件,仅仅用来状态处理,其最新事件的值 element ,交给 _element 去记录

  • 他流入的 .completed 正常结束事件,也是用来状态处理,记录在内部属性 _atEnd

  • 他流入的 .error 出错了,异常结束事件,转发出去,

forwardOn 就是事件交给下一个,事件被输出了

self.dispose(), 相关对象被置为 nil, 事件流被立即终止

func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            self._element = element
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self._atEnd = true
            self._sourceSubscription.dispose()
        }
    }

* SamplerSink 里面的订阅逻辑处理

SampleSequenceSink 里面 run 中, self._parent._sampler.subscribe(SamplerSink(parent: self))

firstSubject.sample(secondSubject) 中的 secondSubject, 控制事件流,负责采样的,

订阅了 SamplerSink 的对象,

决定了 SamplerSinkfunc _synchronized_on(_ event: Event<Element>)

方法中流入的是 secondSubject 采样事件流

  • secondSubject 流入的 .next 正常事件,和 .completed 正常结束事件,一道处理了

_parent._element ,其中 _parentSampleSequenceSink 类的对象,

_parent._element ,就是 SampleSequenceSink 类记录 firstSubject 流入的 .next 正常事件最新的值 value

采样事件来了,就把输入事件最新的值,转发到输出的事件流 ,forwardOn 一下

再把 _parent._element 置为 nil, 这样不会重复采样

于此,.sample 操作符的实现逻辑,大致出来了

444444

case .next, .completed: 等同于 case .next(let _), .completed:secondSubject 采样流事件的值,没有处理

self._parent._atEnd,上一步记录的输入流终止了,采样流又来,

这个时候转发终止事件到输出,做一个内存释放

  • 他流入的 .error 出错了,异常结束事件,转发出去,

事件流结束,内存释放

func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next, .completed:
            if let element = _parent._element {
                self._parent._element = nil
                self._parent.forwardOn(.next(element))
            }

            if self._parent._atEnd {
                self._parent.forwardOn(.completed)
                self._parent.dispose()
            }
        case .error(let e):
            self._parent.forwardOn(.error(e))
            self._parent.dispose()
        }
    }

粗点讲,SampleSequenceSink 的直接输入是, firstSubject.sample(secondSubject) 中的输入事件流 firstSubject

SampleSequenceSink 的直接输出是 class Sample

SampleSequenceSink 做的事情主要是记录输入事件流 firstSubject 最新的状态,最近事件的值与是否结束

SamplerSink 的直接输入是, firstSubject.sample(secondSubject) 中的采样时机控制secondSubject

采样流到了,他就 self._parent.forwardOn,

self._parentSampleSequenceSink 的对象

self._parent.forwardOn, 就是控制住了输入流 firstSubject 到输出流的时机

关键点 2:把订阅逻辑连起来 wire it up,生产者 Producer 类的实现

* 很多操作符,都继承自生产者 Producer

Producer 类是 Observable<Element> 可观察对象的子类, 可观察即可供订阅

Producer 类的代码也很简单,

初始化,重写了 class Observable<Element> 的订阅方法 public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element

重写订阅方法 func subscribe(_ observer: Observer), 指的是走该订阅方法,就要走 run 方法,还做了个内存管理

不能直接调用, func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element

Producer 的子类,各种操作符对应的类,一定要重写自己的 func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }


// 该方法有简化,省略了线程管理相关
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {

            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
         // 这个方法就是报错
        //  fatalError(lastMessage(), file: file, line: line)
        rxAbstractMethod()
    }
}

* 从我们调用说起

调用 public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable

先建立订阅 self.asObservable().subscribe(observer),

再处理事件,

AnonymousObserver 匿名观察者,我们调用 subscribe(onNext: ( 传入的想做的事情,作为匿名函数,也就是闭包,都交给 AnonymousObserver 类了

let observer = AnonymousObserver<Element> { event in
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }

AnonymousObserverclass ObserverBase<Element> 的子类

他的实现也很简单

他有一个闭包的属性,用来接我们传入的匿名函数。

重写了事件调用方法 func onCore(, 将我们传入的闭包执行了

final class AnonymousObserver<Element>: ObserverBase<Element> {
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

可观察基类 ObserverBase 类,是一个事件处理类

他的实现也简洁

RxSwift 大量使用泛型,

他不持有事件,他只能处理对应的事件


class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    // 事件处理, 有一个锁机制,保证事件处理的线程安全
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

    // 实际事件处理,子类必须重写该方法
    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }
    // 内存管理
    func dispose() {
        fetchOr(self._isStopped, 1)
    }
}


* 脉络连起来,wire it up

一般这样弄了,实际发生的步骤简单理下

firstSubject.sample(secondSubject).subscribe(onNext: { (content) in
            print(content)
        })

写下 .subscribe(onNext:,

就走 Sample 类的

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel) 里面的 observer,

就是 AnonymousObserver<Element> { event in }

每一次 self._parent.forwardOn(.error(e))

都会调用其父类 class Sink<Observer: ObserverType>self._observer.on(event)

self._observer 还是 AnonymousObserver<Element> { event in }

一句 subscribe, 可观察者 Observable 和观察者 Observer 之间的弯弯绕绕比较清晰了

大佬高深的技术,努力拆分,都是好理解的常识

再来一个例子: withLatestFrom

withLatestFromSample 很像,

区别是,

  • withLatestFrom , 前面是控制取事件,后面是输入事件

Sample 后面控制采样,前面是输入事件

  • withLatestFrom , 前面的控制事件,可以反复取输入事件的最新值

Sample, 输入事件的最新值,只支持一次采样

  • withLatestFrom , 两个输入事件的值都可用

Sample,采样事件的值,不可用

1111

这样调用

   // firstSubject 是控制事件流
   // secondSubject 是输入事件流
       let firstSubject = PublishSubject<String>()
        let secondSubject = PublishSubject<String>()
       
        firstSubject.withLatestFrom(secondSubject){
                  (first, second) in
                  return first + second
             }
             .subscribe(onNext: { print($0) })
        .disposed(by: rx.disposeBag)
       // 控制事件流,没输入, miss 掉
        firstSubject.onNext("1")
          // 输入事件流,有了事件
        secondSubject.onNext("A")
          // 控制事件流,去触发输入事件流的事件
        firstSubject.onNext("2")

套路第一步: 语法糖,withLatestFrom 函数到类 WithLatestFrom

public func withLatestFrom<Source: ObservableConvertibleType, ResultType>(_ second: Source, resultSelector: @escaping (Element, Source.Element) throws -> ResultType) -> Observable<ResultType> {
        return WithLatestFrom(first: self.asObservable(), second: second.asObservable(), resultSelector: resultSelector)
    }

套路第 2 步: 类 WithLatestFrom 到实现层的 Sink

下面的代码,看看就明白了

final private class WithLatestFrom<FirstType, SecondType, ResultType>: Producer<ResultType> {
    typealias ResultSelector = (FirstType, SecondType) throws -> ResultType
    
    fileprivate let _first: Observable<FirstType>
    fileprivate let _second: Observable<SecondType>
    fileprivate let _resultSelector: ResultSelector

    init(first: Observable<FirstType>, second: Observable<SecondType>, resultSelector: @escaping ResultSelector) {
        self._first = first
        self._second = second
        self._resultSelector = resultSelector
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
        let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

套路第 3 步: 实现层的 Sink,关键两函数,func run() -> Disposablefunc _synchronized_on(_ event: Event<Element>)

* 主 Sink 类 WithLatestFromSink
  • func run() -> Disposable 先看

self._parent._first.subscribe(self) 表明,WithLatestFromSink 管理 firstSubject.withLatestFrom(secondSubject) 中的控制事件流,

self._parent._second.subscribe(sndO) 表明 WithLatestFromSecond 管理输入事件流

func run() -> Disposable {
        let sndSubscription = SingleAssignmentDisposable()
        let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
        
        sndSubscription.setDisposable(self._parent._second.subscribe(sndO))
        let fstSubscription = self._parent._first.subscribe(self)

        return Disposables.create(fstSubscription, sndSubscription)
    }
  • func _synchronized_on(_ event: Event<Element>) 再看

代码简洁,

self._latest 就是记录的输入事件流最新的值,

.next 事件里面,firstSubject.withLatestFrom(secondSubject) 中 firstSubject 的事件来了,如果之前有 secondSubject 事件的值,就处理掉。

没有就,不用管

 func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case let .next(value):
            guard let latest = self._latest else { return }
            do {
                let res = try self._parent._resultSelector(value, latest)
                
                self.forwardOn(.next(res))
            } catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        case let .error(error):
            self.forwardOn(.error(error))
            self.dispose()
        }
    }
* 到了辅助 Sink 类 WithLatestFromSecond

就是看事件处理,输入事件流 firstSubject.withLatestFrom(secondSubject) 中的 secondSubject,

其事件,出错就转发,.next 事件,就取值记录到主 SinkWithLatestFromSink

其余与前面 .sample 的分析类似

func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case let .next(value):
            self._parent._latest = value
        case .completed:
            self._disposable.dispose()
        case let .error(error):
            self._parent.forwardOn(.error(error))
            self._parent.dispose()
        }
    }

自定制操作符, .sample 做加强: 两个输入流的值,都要

稍微改造下 withLatestFrom 的代码,就是 sampleWithFrom , sample 加强版

WithLatestFromSink 的事件流方法加一句 self._latest = nil, 就好了

记录的输入事件流的最新值,一次可用。之前是,反复可用

func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case let .next(value):
            guard let latest = self._latest else { return }
            do {
                self._latest = nil
                let res = try self._parent._resultSelector(value, latest)
                
                self.forwardOn(.next(res))
            } catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }

至此, sampleWithFromsample 的采样次序不一致,

更改参数位置,很简单

PS: 有些三方库 Operator ,是原来事件流的进一步处理,不需要新建定制的事件流,

这些自定制的 Operator,是给定的 Observable 后面加几个函数,就好了

没什么状态管理,要用到类

github 链接

源代码要拖到相关 Pods 里面,编译使用