RxSwift篇章
看一个非常特殊的类型-Subject.为什么说它特殊呢?原因很简单:**Subject既可以做序列,也可以做观察者!**正是因为这一特性,所以在实际开发中被大量运用。下面我们一起来解读一下这个特殊的Subject
原理
SubjectType
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
///
associatedtype Observer: ObserverType
///
func asObserver() -> Observer
}
-
SubjectType首先就是继承了ObservableType,具有序列特性.
-
关联了观察者类型,具备这个类型的能力.
-
下面我们通过一个具体类型来感受一下subject.
//创建序列
let publishSub = PublishSubject<Int>()
//发送响应
publishSub.onNext(1)
//订阅序列
publishSub.subscribe{
print("订阅到了:",$0)
}
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
-
很明显能够订阅信号和发送响应.
-
查看底层源码分析
订阅流程
public override func subscribe(_ observer: Observer)... -> Disposable
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe(_ observer: Observer)... -> Disposable {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
- self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显可以处理全部响应.
发送信号流程
public func on(_ event: Event<Element>) {
...
dispatch(self._synchronized_on(event), event)
}
- 这里主要调用了dispatch函数,传了两个参数:self._synchronized_on(event)和event
- 分析self._synchronized_on(event)
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
-
这里如果self._isDisposed || self._stopped成立就会返回一个空的集合,也就没有序列的响应
-
在.completed, .error都会改变状态self._stopped = true,也就是说序列完成或者错误之后都无法再次响应了
-
在.completed, .error还会移除添加在集合里面的内容
-
根据事件状态,重点看返回观察者 observers,继续dispatch函数分析
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
-
bag._value0?(event)首先执行事件的回调
-
判断bag._onlyFastPath的情况,默认为快速通道!
-
如果是开启慢速通道,需要从pairs,dictionary中依次处理外界事件回调.
小结
相对普通序列,subject 把订阅流程和响应流程都内部实现了,所以也就没有必要引入sink!
Subject分类
PublishSubject
初始化简单(不带默认值),并且它只会向订阅者发送在订阅之后才接收到的元素。
let publishSub = PublishSubject<Int>()
//发送
publishSub.onNext(1)
//订阅
publishSub.subscribe{
print("订阅到了:",$0)
}
.disposed(by: disposbag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
- 信号:1 是无法被订阅的,只接受订阅之后的响应.
BehaviorSubject
通过一个默认初始值来创建,当订阅者订阅BehaviorSubject时,会收到订阅后Subject上一个发出的Event,如果还没有收到任何数据,会发出一个默认值。之后就和PublishSubject一样,正常接收新的事件。
和publish 稍微不同就是behavior有个存储功能:存储上一次的信号.
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:发送信号
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:订阅序列
behaviorSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
// 再次发送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次订阅
behaviorSub.subscribe{ print("订阅到了:",$0)}
.disposed(by: disposbag)
源码分析
//初始化
public init(value: Element) {
self._element = value
}
//事件响应
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
if self._stoppedEvent != nil || self._isDisposed {
return Observers()
}
switch event {
case .next(let element):
self._element = element
case .error, .completed:
self._stoppedEvent = event
}
return self._observers
}
-
初始化的时候带有一个属性self._element保存一个信号
-
事件响应:新事件会覆盖原来的事件
ReplaySubject
相对于BehaviorSubject,只是存储能力的不同,ReplaySubject可以自定义个数.
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// 2:发送信号
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:订阅序列
replaySub.subscribe{
print("订阅到了:",$0)
}
.disposed(by: disposbag)
// 再次发送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
AsyncSubject
AsyncSubject只发送由源Observable发送的最后一个事件,并且只在源Observable完成之后。(如果源Observable没有发送任何值,AsyncSubject也不会发送任何值。)
let asynSub = AsyncSubject<Int>.init()
// 2:发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 3:订阅序列
asynSub.subscribe
{ print("订阅到了:",$0)
}
.disposed(by: disposbag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.subscribe
{ print("订阅到了:",$0)
}
.disposed(by: disposbag)
asynSub.onCompleted()
- 源码响应处理
func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
self._lock.lock(); defer { self._lock.unlock() }
if self._isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
}
-
可以很清晰的看出,普通Next事件都是元素的替换,根本没有响应出来. complete事件发送到时候,就会把最新保存的self._lastElement当成事件值传出去,响应.next(lastElement)
-
如果没有保存事件就发送完成事件:.completed
-
error事件会移空整个响应集合:self._observers.removeAll()
BehaviorRelay
-
可以储存一个信号
-
随时订阅响应
-
响应发送的时候要注意为, .accept()
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe { (result) in
print(result)
}
.disposed(by: disposbag)
print("打印:\(behaviorRelay.value)")
behaviorRelay.accept(1000)
源码
public final class BehaviorRelay<Element>: ObservableType {
private let _subject: BehaviorSubject<Element>
/// Accepts `event` and emits it to subscribers
public func accept(_ event: Element) {
self._subject.onNext(event)
}
/// Initializes
public init(value: Element) {
self._subject = BehaviorSubject(value: value)
}
}
- 内部封装了BehaviorSubject,非常明显.