RxSwift之Subject

1,424 阅读3分钟

1、PublishSubject

PublishSubject将对观察者发送订阅后产生的元素,而在订阅前发送的元素将不会发送给观察者。

示例:

let subject = PublishSubject<String>()

subject.onNext("🐘")

subject.subscribe(onNext: { print("订阅到了: \($0)") })
    .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
打印结果:
    订阅到了: 🐶
    订阅到了: 🐱

源码分析:

通过 PublishSubject 的继承关系,可以看出,PublishSubject 既是 可监听序列,也是 观察者

(1) 在第一次执行 onNext 函数 subject.onNext("🐘")

会执行 PublishSubject.on 函数,并继续调用 dispatch 函数和 self._synchronized_on(event) 函数。

  • self._synchronized_on(event) 函数最终会返回 self._observers
  • self._observers 的类型是 Bag<(Event<Element>) -> Void>
  • 再查看 dispatch 函数的源码
    经分析可以知道,其作用就是对 bag 中保存的 代码块进行执行回调。

因为上一步传入的 bag 仅是一个初始化的 bag 中间没有保存任何代码块,所以不会执行任何回调。到这里,在第一次执行 subject.onNext("🐘") 已经结束。没有任何打印结果。

(2) 调用 subscribe

  • PublishSubject 源码中,实现了 subscribe
  • 最终会调用 let key = self._observers.insert(observer.on)
  • 查看 insert 函数源码。
    因为是第首次执行,所以 _dictionary 值为nil,且 _pairs.count 小于 arrayDictionaryMaxSize。所以,观察者的 on函数 observer.on 被加入到 _pairs 数组中。

(3)第二次执行 onNext 函数 subject.onNext("🐶")。开始的步骤和 (1)中一致,只是在最后执行 dispatch 函数时,因为 bag._pairs 有保存一个观察者的 on 函数代码块,所以会执行回调。

最终会回调 subscribe.onNext 闭包,打印结果。(ps: 这一步不清楚的小伙伴请阅读RxSwift核心逻辑简介

2、BehaviorSubject

当观察者对BehaviorSubject进行订阅时,它会将源 Observable 中最新的元素发送出来。如果不存在最新的元素,就发送默认元素。然后将随后产生的元素发送出来。

示例:

let subject = BehaviorSubject<Int>(value: 100)

subject.subscribe(onNext: { print("订阅1:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(5)

subject.subscribe(onNext: { print("订阅2:\($0)") })
    .disposed(by: disposeBag)
打印结果:
    订阅1100
    订阅13
    订阅15
    订阅25

源码分析: BehaviorSubjectPublishSubject 源码很相似,但是也有差异之处,我们重点分析其中的差异,相似之处请参照上面的 PublishSubject 源码分析。

(1)初始化方法

public init(value: Element) {
    self._element = value
    
    #if TRACE_RESOURCES
    _ = Resources.incrementTotal()
    #endif
}

BehaviorSubject 的初始化方法需要传入一个初始值,作为默认元素。

(2) subscribe 函数

BehaviorSubject_synchronized_subscribe 函数中比 PublishSubject 多一行代码,执行了一次 observer.on 函数,并将 self._element 作为参数传递。 self._element 中保存的是最新发送的元素,如果没有最新元素,则为 init 初始化时的默认元素。

(3)onNext 函数

BehaviorSubject 在调用 onNext 时,会将最新的元素保存在 self._element 中,在执行执行 subscribe 时,发送出去。

3、ReplaySubject

无论观察者是何时进行订阅的,ReplaySubject都将对观察者发送全部的元素。

示例:

let subject = ReplaySubject<Int>.create(bufferSize: 2)

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)

subject.subscribe(onNext: { print("订阅到:\($0)") })
    .disposed(by: disposeBag)

subject.onNext(4)
subject.onNext(5)
subject.onNext(6)
打印结果:
    订阅到:2
    订阅到:3
    订阅到:4
    订阅到:5
    订阅到:6

源码分析: (1)初始化方法 create函数

因为示例中传入的 bufferSize 为 2,所以创建一个 ReplayMany 对象。
其作用是创建一个指定大小的队列 queue 来保存需要发送的元素。

(2)onNext 函数 onNext 函数调用的是 ReplayBufferBaseon 函数。

  • 调用 addValueToBuffer 函数将发送的元素加入到 queue 中。
  • 调用 trim 函数删除 queue 中大于 bufferSize 的多余的元素。

(3)subscribe 函数

在将 observer 加入到 self._observers 之前,先调用了 self.replayBuffer(anyObserver)。会执行 ReplayManyBase.replayBuffer 函数。

override func replayBuffer<O: ObserverType>(_ observer: O) where O.E == Element {
    for item in self._queue {
        observer.on(.next(item))
    }
}

将保存在队列 queue 中的所有元素,发送给订阅者。

4、AsyncSubject

AsyncSubject 将在源 Observable 产生完成时间之后,发出最后一个元素(有且仅有最后一个元素)。如果源 Observable 没有发出任何元素,只有一个完成事件,则AsyncSubject也只有一个完成事件。如果源 Observable 产生了一个 error 事件而中止,那么 AsyncSubject 就不会发出任何元素,而是将 error 事件发送出来。

示例:

let subject = AsyncSubject<Int>()

subject.onNext(1)
subject.onNext(2)

subject.subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.onNext(3)
subject.onNext(4)
subject.onCompleted()
打印结果:
    订阅到:next(4)
    订阅到:completed

源码分析:重点分析on 函数 最后会来到下面👇的代码,获取保存的 observer.on 闭包,再执行事件回调。

  • next 事件中,将最新的元素保存在 self._lastElement 中后,并没有返回所有的 observer.on 和 发送 next 事件,反而是初始化了一个空的 observer.on 集合和返回 completed 事件。所以不会有观察者响应事件收到信号。
  • error 事件中,会清空所有的 self._observers,并返回所有的 observer.onerror 事件,所以,所有的观察者都只会收到 error 信号。
  • completed 事件中。 会判断是否有发送过来的最新元素:如果有,就将最新元素发送出去,并执行 next 事件。
    并且在执行 next 事件之后,会执行 completed 事件。如果没有最新元素,则仅对所有 observer.on 发送 completed 事件。

5、Variable(已弃用)

示例:

let subject = Variable.init(1)

subject.value = 10
subject.value = 100

subject.asObservable().subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.value = 1000
打印结果:
    ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
    订阅到:next(100)
    订阅到:next(1000)
    订阅到:completed

源码分析:

从源码可以看出,Variable 虽然没有继承自 ObserverType 或者 Observable。但是其有一个 _subject: BehaviorSubject<Element> 属性。所以,Variable 的行为和 BehaviorSubject 是一致的。但因为不是继承自 ObserverType,所以没有 on 函数,不能直接调用 on 函数发送信号。

  • 在初始化时,使用初始化值,初始化 BehaviorSubject,并保存在 self._subject 中。
  • value 做了一层封装,在 valueset 函数中,会调用 _subjecton 函数。完成信号的发送。

官方推荐使用 BehaviorRelayBehaviorSubject 作为替换。

6、BehaviorRelay

BehaviorRelay 就是 BehaviorSubject 去掉终止事件 onErroronCompleted

示例:

let subject = BehaviorRelay(value: 1)

subject.accept(10)

subject.subscribe({ print("订阅到:\($0)")})
    .disposed(by: disposeBag)

subject.accept(100)

subject.accept(1000)
打印结果:
    订阅到:next(10)
    订阅到:next(100)
    订阅到:next(1000)

源码分析:

查看源码,请注意 BehaviorRelay 上方的注释,注释中说得非常清楚,BehaviorRelay 是对 BehaviorSubject 的封装,但是和 BehaviorSubject 不一样的地方在于,BehaviorRelay 不会被 errorcompleted 事件终止。

既然已经有了 BehaviorSubject,又为何需要BehaviorRelay 来对其进行封装呢? 一般来说,如果需要知道 BehaviorSubject 当前的发送的信号值,只能在 subscribe 中获取,但是使用 BehaviorRelay 则可以方便的使用 BehaviorRelay.value 获取到当前的信号,非常之方便。

以上就是对常见的 Subject 的一些分析,若有不足之处,请评论指正。