RxSwift 三大支柱:Observable 可观察对象,操作符 Operator 和线程调度 Schedulers
本文手把手,看下 RxSwift 是怎么实现操作符 Operator 的
实现套路三步走:
例子,是 sample 采样操作符
我们这样使用 sample
// firstSubject 是输入事件流
// secondSubject 是一个控制事件流
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject.sample(secondSubject).subscribe(onNext: { (content) in
print(content)
}).disposed(by: rx.disposeBag)
// 输入事件流,有了事件
firstSubject.onNext("1")
// 控制事件流,去触发输入事件流的事件
secondSubject.onNext("A")
第一步,从 Observable 调用的语法糖,到 Operator 相应的做事情的类
下面的代码表明,调用 .sample
, 是一层封装,方便组合与调用
采样实际的实现逻辑,全部在创建的类 Sample
firstSubject.sample(secondSubject)
, 输入两个 observable,输出一个新的 observable .
extension ObservableType {
public func sample<Source: ObservableType>(_ sampler: Source)
-> Observable<Element> {
return Sample(source: self.asObservable(), sampler: sampler.asObservable())
}
}
第 2 步,从 Operator 相应的实现类,到实际处理订阅的若干 Sink 类
Sample 类,是事件生产者类 Producer 的子类
前半段,很简单,弄两属性,
firstSubject.sample(secondSubject)
, 就做了一个 Sample 的初始化,把 firstSubject
和 secondSubject
, 交给这两属性持有
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
,
这方法比较核心,里面完成了 firstSubject.sample(secondSubject)
的两个事件输入流的管理
起作用的是,操作符对应的若干 Sink 类
final private class Sample<Element, SampleType>: Producer<Element> {
fileprivate let _source: Observable<Element>
fileprivate let _sampler: Observable<SampleType>
init(source: Observable<Element>, sampler: Observable<SampleType>) {
self._source = source
self._sampler = sampler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
第 3 步,若干 Sink 类,完成对输入事件流的逻辑处理
* Sample 类的 Sink 一号,final private class SampleSequenceSink<Observer: ObserverType, SampleType> : Sink<Observer>
上一步的 let subscription = sink.run()
,run()
的部分
func run() -> Disposable {
self._sourceSubscription.setDisposable(self._parent._source.subscribe(self))
let samplerSubscription = self._parent._sampler.subscribe(SamplerSink(parent: self))
return Disposables.create(_sourceSubscription, samplerSubscription)
}
里面做的事情,建立订阅和内存管理
self._parent._source.subscribe(self)
,建立订阅,
self._parent._source
是 , firstSubject.sample(secondSubject)
中的 firstSubject
self._parent._sampler.subscribe(SamplerSink(parent: self)
, 继续建立订阅
self._parent._sampler
是, firstSubject.sample(secondSubject)
中的 secondSubject
- 剩下的
self._sourceSubscription.setDisposable
和
return Disposables.create(_sourceSubscription, samplerSubscription)
, 是内存管理方面
管理 subscribe 订阅到的对象 ObservableType
* Sample 类的 Sink 2 号,final private class SamplerSink<Observer: ObserverType, SampleType>
-
SamplerSink
主要涉及firstSubject.sample(secondSubject)
中的secondSubject
的事件处理逻辑,采样时机控制 -
SampleSequenceSink
主要涉及firstSubject.sample(secondSubject)
中的firstSubject
的事件处理逻辑,输入事件状态管理
操作符 .sample -> 对应的类 Sample -> 若干 Sink 类, 完成事件管道的建设
套路三步,有些弯弯绕绕
sink, 事件的下沉,套路三步的底部
关键点一:Sink 类是什么?
粗点讲:
Sink 是从 Observable , 事件流 event sequence 中取出事件,取出事件含有的值的一种方式
Observable 里面的事件 Event, 不能够直接取出来,必须通过订阅,才能读取
操作符 Operator 处理, 可观察对象 Observable
Observable 起作用的方式,是 subscribe 订阅出事件 Event
Observable 事件流,就是传递事件 Event 的
Event 是一个枚举,有三种状态
-
有事情,
.next(value)
-
没事情,分两种情况,正常结束 gg ,
.completed
和出错凉凉 .error(error)
public enum Event<Element> {
/// 生产出下一个事件
case next(Element)
/// 事件流结束,失败
case error(Swift.Error)
/// 事件流完成,成功
case completed
}
firstSubject.sample(secondSubject)
, 从输入两枚,到输出一个
要做的是,掏出输入两个 Observable
的事件,处理下,输出需要的 Observable
已经被弃用的
Variable
, 掏出.next(value)
中的value
, 比较简单, 事情想来就来,不符合响应式规范
Observable
的 Event 事件取出,只能通过订阅的方式 subscribe
简单的理解, Sink 类是一种取输入流事件的方式
下面是用例,继续 sample
* SampleSequenceSink
里面的订阅逻辑处理
self._parent._source.subscribe(self)
firstSubject.sample(secondSubject)
中的 firstSubject
, 输入事件流,被采样的对象,
因为 firstSubject
订阅了 self
,即 SampleSequenceSink
类的对象
func _synchronized_on(_ event: Event<Element>)
中的 Event,就是 firstSubject
的事件
-
firstSubject
流入的.next
正常事件,仅仅用来状态处理,其最新事件的值 element ,交给_element
去记录 -
他流入的
.completed
正常结束事件,也是用来状态处理,记录在内部属性_atEnd
-
他流入的
.error
出错了,异常结束事件,转发出去,
forwardOn
就是事件交给下一个,事件被输出了
self.dispose()
, 相关对象被置为 nil, 事件流被立即终止
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let element):
self._element = element
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self._atEnd = true
self._sourceSubscription.dispose()
}
}
* SamplerSink
里面的订阅逻辑处理
SampleSequenceSink
里面 run
中, self._parent._sampler.subscribe(SamplerSink(parent: self))
firstSubject.sample(secondSubject)
中的 secondSubject
, 控制事件流,负责采样的,
订阅了 SamplerSink
的对象,
决定了 SamplerSink
的 func _synchronized_on(_ event: Event<Element>)
方法中流入的是 secondSubject
采样事件流
secondSubject
流入的.next
正常事件,和.completed
正常结束事件,一道处理了
_parent._element
,其中 _parent
是 SampleSequenceSink
类的对象,
_parent._element
,就是 SampleSequenceSink
类记录 firstSubject
流入的 .next
正常事件最新的值 value
采样事件来了,就把输入事件最新的值,转发到输出的事件流 ,forwardOn
一下
再把 _parent._element
置为 nil, 这样不会重复采样
于此,.sample
操作符的实现逻辑,大致出来了
case .next, .completed:
等同于 case .next(let _), .completed:
, secondSubject
采样流事件的值,没有处理
self._parent._atEnd
,上一步记录的输入流终止了,采样流又来,
这个时候转发终止事件到输出,做一个内存释放
- 他流入的
.error
出错了,异常结束事件,转发出去,
事件流结束,内存释放
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next, .completed:
if let element = _parent._element {
self._parent._element = nil
self._parent.forwardOn(.next(element))
}
if self._parent._atEnd {
self._parent.forwardOn(.completed)
self._parent.dispose()
}
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
}
}
粗点讲,SampleSequenceSink
的直接输入是, firstSubject.sample(secondSubject)
中的输入事件流 firstSubject
SampleSequenceSink
的直接输出是 class Sample
SampleSequenceSink
做的事情主要是记录输入事件流 firstSubject
最新的状态,最近事件的值与是否结束
SamplerSink
的直接输入是, firstSubject.sample(secondSubject)
中的采样时机控制secondSubject
采样流到了,他就 self._parent.forwardOn
,
self._parent
即 SampleSequenceSink
的对象
self._parent.forwardOn
, 就是控制住了输入流 firstSubject 到输出流的时机
关键点 2:把订阅逻辑连起来 wire it up,生产者 Producer
类的实现
* 很多操作符,都继承自生产者 Producer
Producer
类是 Observable<Element>
可观察对象的子类, 可观察即可供订阅
Producer
类的代码也很简单,
初始化,重写了 class Observable<Element>
的订阅方法 public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
重写订阅方法 func subscribe(_ observer: Observer)
, 指的是走该订阅方法,就要走 run
方法,还做了个内存管理
不能直接调用, func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
Producer
的子类,各种操作符对应的类,一定要重写自己的 func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
// 该方法有简化,省略了线程管理相关
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// 这个方法就是报错
// fatalError(lastMessage(), file: file, line: line)
rxAbstractMethod()
}
}
* 从我们调用说起
调用 public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable
,
先建立订阅 self.asObservable().subscribe(observer)
,
再处理事件,
AnonymousObserver
匿名观察者,我们调用 subscribe(onNext: (
传入的想做的事情,作为匿名函数,也就是闭包,都交给 AnonymousObserver 类了
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
AnonymousObserver
是 class ObserverBase<Element>
的子类
他的实现也很简单
他有一个闭包的属性,用来接我们传入的匿名函数。
重写了事件调用方法 func onCore(
, 将我们传入的闭包执行了
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
可观察基类 ObserverBase
类,是一个事件处理类
他的实现也简洁
RxSwift 大量使用泛型,
他不持有事件,他只能处理对应的事件
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
// 事件处理, 有一个锁机制,保证事件处理的线程安全
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
// 实际事件处理,子类必须重写该方法
func onCore(_ event: Event<Element>) {
rxAbstractMethod()
}
// 内存管理
func dispose() {
fetchOr(self._isStopped, 1)
}
}
* 脉络连起来,wire it up
一般这样弄了,实际发生的步骤简单理下
firstSubject.sample(secondSubject).subscribe(onNext: { (content) in
print(content)
})
写下 .subscribe(onNext:
,
就走 Sample
类的
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
里面的 observer,
就是 AnonymousObserver<Element> { event in }
每一次 self._parent.forwardOn(.error(e))
,
都会调用其父类 class Sink<Observer: ObserverType>
的 self._observer.on(event)
self._observer
还是 AnonymousObserver<Element> { event in }
一句 subscribe, 可观察者 Observable 和观察者 Observer 之间的弯弯绕绕比较清晰了
大佬高深的技术,努力拆分,都是好理解的常识
再来一个例子: withLatestFrom
withLatestFrom
和 Sample
很像,
区别是,
withLatestFrom
, 前面是控制取事件,后面是输入事件
Sample
后面控制采样,前面是输入事件
withLatestFrom
, 前面的控制事件,可以反复取输入事件的最新值
Sample
, 输入事件的最新值,只支持一次采样
withLatestFrom
, 两个输入事件的值都可用
Sample
,采样事件的值,不可用
这样调用
// firstSubject 是控制事件流
// secondSubject 是输入事件流
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject.withLatestFrom(secondSubject){
(first, second) in
return first + second
}
.subscribe(onNext: { print($0) })
.disposed(by: rx.disposeBag)
// 控制事件流,没输入, miss 掉
firstSubject.onNext("1")
// 输入事件流,有了事件
secondSubject.onNext("A")
// 控制事件流,去触发输入事件流的事件
firstSubject.onNext("2")
套路第一步: 语法糖,withLatestFrom
函数到类 WithLatestFrom
public func withLatestFrom<Source: ObservableConvertibleType, ResultType>(_ second: Source, resultSelector: @escaping (Element, Source.Element) throws -> ResultType) -> Observable<ResultType> {
return WithLatestFrom(first: self.asObservable(), second: second.asObservable(), resultSelector: resultSelector)
}
套路第 2 步: 类 WithLatestFrom
到实现层的 Sink
下面的代码,看看就明白了
final private class WithLatestFrom<FirstType, SecondType, ResultType>: Producer<ResultType> {
typealias ResultSelector = (FirstType, SecondType) throws -> ResultType
fileprivate let _first: Observable<FirstType>
fileprivate let _second: Observable<SecondType>
fileprivate let _resultSelector: ResultSelector
init(first: Observable<FirstType>, second: Observable<SecondType>, resultSelector: @escaping ResultSelector) {
self._first = first
self._second = second
self._resultSelector = resultSelector
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
套路第 3 步: 实现层的 Sink,关键两函数,func run() -> Disposable
和 func _synchronized_on(_ event: Event<Element>)
* 主 Sink 类 WithLatestFromSink
,
func run() -> Disposable
先看
self._parent._first.subscribe(self)
表明,WithLatestFromSink
管理 firstSubject.withLatestFrom(secondSubject)
中的控制事件流,
self._parent._second.subscribe(sndO)
表明 WithLatestFromSecond
管理输入事件流
func run() -> Disposable {
let sndSubscription = SingleAssignmentDisposable()
let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
sndSubscription.setDisposable(self._parent._second.subscribe(sndO))
let fstSubscription = self._parent._first.subscribe(self)
return Disposables.create(fstSubscription, sndSubscription)
}
func _synchronized_on(_ event: Event<Element>)
再看
代码简洁,
self._latest
就是记录的输入事件流最新的值,
.next
事件里面,firstSubject.withLatestFrom(secondSubject)
中 firstSubject 的事件来了,如果之前有 secondSubject 事件的值,就处理掉。
没有就,不用管
func _synchronized_on(_ event: Event<Element>) {
switch event {
case let .next(value):
guard let latest = self._latest else { return }
do {
let res = try self._parent._resultSelector(value, latest)
self.forwardOn(.next(res))
} catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .completed:
self.forwardOn(.completed)
self.dispose()
case let .error(error):
self.forwardOn(.error(error))
self.dispose()
}
}
* 到了辅助 Sink 类 WithLatestFromSecond
,
就是看事件处理,输入事件流 firstSubject.withLatestFrom(secondSubject)
中的 secondSubject,
其事件,出错就转发,.next
事件,就取值记录到主 Sink
类 WithLatestFromSink
其余与前面 .sample
的分析类似
func _synchronized_on(_ event: Event<Element>) {
switch event {
case let .next(value):
self._parent._latest = value
case .completed:
self._disposable.dispose()
case let .error(error):
self._parent.forwardOn(.error(error))
self._parent.dispose()
}
}
自定制操作符, .sample 做加强: 两个输入流的值,都要
稍微改造下 withLatestFrom
的代码,就是 sampleWithFrom
, sample
加强版
WithLatestFromSink
的事件流方法加一句 self._latest = nil
, 就好了
记录的输入事件流的最新值,一次可用。之前是,反复可用
func _synchronized_on(_ event: Event<Element>) {
switch event {
case let .next(value):
guard let latest = self._latest else { return }
do {
self._latest = nil
let res = try self._parent._resultSelector(value, latest)
self.forwardOn(.next(res))
} catch let e {
self.forwardOn(.error(e))
self.dispose()
}
至此, sampleWithFrom
和 sample
的采样次序不一致,
更改参数位置,很简单
PS: 有些三方库 Operator ,是原来事件流的进一步处理,不需要新建定制的事件流,
这些自定制的 Operator,是给定的 Observable 后面加几个函数,就好了
没什么状态管理,要用到类
github 链接
源代码要拖到相关 Pods 里面,编译使用