1、PublishSubject
PublishSubject将对观察者发送订阅后产生的元素,而在订阅前发送的元素将不会发送给观察者。
示例:
let subject = PublishSubject<String>()
subject.onNext("🐘")
subject.subscribe(onNext: { print("订阅到了: \($0)") })
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
打印结果:
订阅到了: 🐶
订阅到了: 🐱
源码分析:
通过PublishSubject
的继承关系,可以看出,PublishSubject
既是 可监听序列,也是 观察者。
(1) 在第一次执行 onNext
函数 subject.onNext("🐘")
时
PublishSubject.on
函数,并继续调用 dispatch
函数和 self._synchronized_on(event)
函数。
self._synchronized_on(event)
函数最终会返回self._observers
。- 而
self._observers
的类型是Bag<(Event<Element>) -> Void>
。 - 再查看
dispatch
函数的源码 经分析可以知道,其作用就是对bag
中保存的 代码块进行执行回调。
因为上一步传入的 bag
仅是一个初始化的 bag
中间没有保存任何代码块,所以不会执行任何回调。到这里,在第一次执行 subject.onNext("🐘")
已经结束。没有任何打印结果。
(2) 调用 subscribe
。
- 在
PublishSubject
源码中,实现了subscribe
。 - 最终会调用
let key = self._observers.insert(observer.on)
- 查看
insert
函数源码。 因为是第首次执行,所以_dictionary
值为nil,且_pairs.count
小于arrayDictionaryMaxSize
。所以,观察者的on
函数observer.on
被加入到_pairs
数组中。
(3)第二次执行 onNext
函数 subject.onNext("🐶")
。开始的步骤和 (1)中一致,只是在最后执行 dispatch
函数时,因为 bag._pairs
有保存一个观察者的 on
函数代码块,所以会执行回调。
最终会回调 subscribe.onNext
闭包,打印结果。(ps: 这一步不清楚的小伙伴请阅读RxSwift核心逻辑简介)
2、BehaviorSubject
当观察者对BehaviorSubject进行订阅时,它会将源
Observable
中最新的元素发送出来。如果不存在最新的元素,就发送默认元素。然后将随后产生的元素发送出来。
示例:
let subject = BehaviorSubject<Int>(value: 100)
subject.subscribe(onNext: { print("订阅1:\($0)") })
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(5)
subject.subscribe(onNext: { print("订阅2:\($0)") })
.disposed(by: disposeBag)
打印结果:
订阅1:100
订阅1:3
订阅1:5
订阅2:5
源码分析: BehaviorSubject
和 PublishSubject
源码很相似,但是也有差异之处,我们重点分析其中的差异,相似之处请参照上面的 PublishSubject
源码分析。
(1)初始化方法
public init(value: Element) {
self._element = value
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
BehaviorSubject
的初始化方法需要传入一个初始值,作为默认元素。
(2) subscribe
函数
BehaviorSubject
在 _synchronized_subscribe
函数中比 PublishSubject
多一行代码,执行了一次 observer.on
函数,并将 self._element
作为参数传递。 self._element
中保存的是最新发送的元素,如果没有最新元素,则为 init
初始化时的默认元素。
(3)onNext
函数
BehaviorSubject
在调用 onNext
时,会将最新的元素保存在 self._element
中,在执行执行 subscribe
时,发送出去。
3、ReplaySubject
无论观察者是何时进行订阅的,ReplaySubject都将对观察者发送全部的元素。
示例:
let subject = ReplaySubject<Int>.create(bufferSize: 2)
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe(onNext: { print("订阅到:\($0)") })
.disposed(by: disposeBag)
subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
打印结果:
订阅到:2
订阅到:3
订阅到:4
订阅到:5
订阅到:6
源码分析:
(1)初始化方法 create
函数
bufferSize
为 2,所以创建一个 ReplayMany
对象。
其作用是创建一个指定大小的队列 queue
来保存需要发送的元素。
(2)onNext
函数
onNext
函数调用的是 ReplayBufferBase
的 on
函数。
- 调用
addValueToBuffer
函数将发送的元素加入到queue
中。 - 调用
trim
函数删除queue
中大于bufferSize
的多余的元素。
(3)subscribe
函数
observer
加入到 self._observers
之前,先调用了 self.replayBuffer(anyObserver)
。会执行 ReplayManyBase.replayBuffer
函数。
override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
for item in self._queue {
observer.on(.next(item))
}
}
将保存在队列 queue
中的所有元素,发送给订阅者。
4、AsyncSubject
AsyncSubject 将在源
Observable
产生完成时间之后,发出最后一个元素(有且仅有最后一个元素)。如果源Observable
没有发出任何元素,只有一个完成事件,则AsyncSubject也只有一个完成事件。如果源Observable
产生了一个error
事件而中止,那么 AsyncSubject 就不会发出任何元素,而是将error
事件发送出来。
示例:
let subject = AsyncSubject<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
打印结果:
订阅到:next(4)
订阅到:completed
源码分析:重点分析on
函数
最后会来到下面👇的代码,获取保存的 observer.on
闭包,再执行事件回调。
- 在
next
事件中,将最新的元素保存在self._lastElement
中后,并没有返回所有的observer.on
和 发送next
事件,反而是初始化了一个空的observer.on
集合和返回completed
事件。所以不会有观察者响应事件收到信号。 - 在
error
事件中,会清空所有的self._observers
,并返回所有的observer.on
和error
事件,所以,所有的观察者都只会收到error
信号。 - 在
completed
事件中。 会判断是否有发送过来的最新元素:如果有,就将最新元素发送出去,并执行next
事件。 并且在执行next
事件之后,会执行completed
事件。如果没有最新元素,则仅对所有observer.on
发送completed
事件。
5、Variable(已弃用)
示例:
let subject = Variable.init(1)
subject.value = 10
subject.value = 100
subject.asObservable().subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.value = 1000
打印结果:
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
订阅到:next(100)
订阅到:next(1000)
订阅到:completed
源码分析:
从源码可以看出,Variable
虽然没有继承自 ObserverType
或者 Observable
。但是其有一个 _subject: BehaviorSubject<Element>
属性。所以,Variable
的行为和 BehaviorSubject
是一致的。但因为不是继承自 ObserverType
,所以没有 on
函数,不能直接调用 on
函数发送信号。
- 在初始化时,使用初始化值,初始化
BehaviorSubject
,并保存在self._subject
中。 - 对
value
做了一层封装,在value
的set
函数中,会调用_subject
的on
函数。完成信号的发送。
官方推荐使用 BehaviorRelay 和 BehaviorSubject 作为替换。
6、BehaviorRelay
BehaviorRelay 就是 BehaviorSubject 去掉终止事件
onError
和onCompleted
。
示例:
let subject = BehaviorRelay(value: 1)
subject.accept(10)
subject.subscribe({ print("订阅到:\($0)")})
.disposed(by: disposeBag)
subject.accept(100)
subject.accept(1000)
打印结果:
订阅到:next(10)
订阅到:next(100)
订阅到:next(1000)
源码分析:
查看源码,请注意BehaviorRelay
上方的注释,注释中说得非常清楚,BehaviorRelay
是对 BehaviorSubject
的封装,但是和 BehaviorSubject
不一样的地方在于,BehaviorRelay
不会被 error
和 completed
事件终止。
既然已经有了 BehaviorSubject,又为何需要BehaviorRelay 来对其进行封装呢? 一般来说,如果需要知道 BehaviorSubject 当前的发送的信号值,只能在
subscribe
中获取,但是使用 BehaviorRelay 则可以方便的使用BehaviorRelay.value
获取到当前的信号,非常之方便。
以上就是对常见的 Subject 的一些分析,若有不足之处,请评论指正。