RxSwift-Subject攻守兼备

780 阅读3分钟

RxSwift篇章

看一个非常特殊的类型-Subject.为什么说它特殊呢?原因很简单:**Subject既可以做序列,也可以做观察者!**正是因为这一特性,所以在实际开发中被大量运用。下面我们一起来解读一下这个特殊的Subject

原理

SubjectType

/// Represents an object that is both an observable sequence as well as an observer.

public protocol SubjectType : ObservableType {
    /// 
    associatedtype Observer: ObserverType
    /// 
    func asObserver() -> Observer
}
  • SubjectType首先就是继承了ObservableType,具有序列特性.

  • 关联了观察者类型,具备这个类型的能力.

  • 下面我们通过一个具体类型来感受一下subject.

        //创建序列
        let publishSub = PublishSubject<Int>()
        //发送响应
        publishSub.onNext(1)
        //订阅序列
        publishSub.subscribe{
            print("订阅到了:",$0)
        }
        .disposed(by: disposbag)
        // 再次发送响应
        publishSub.onNext(2)
        publishSub.onNext(3)
  • 很明显能够订阅信号和发送响应.

  • 查看底层源码分析

订阅流程

   public override func subscribe(_ observer: Observer)... -> Disposable 
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe(_ observer: Observer)... -> Disposable  {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
  • self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显可以处理全部响应.

发送信号流程

    public func on(_ event: Event<Element>) {
    ...
        dispatch(self._synchronized_on(event), event)
    }
  • 这里主要调用了dispatch函数,传了两个参数:self._synchronized_on(event)和event
  • 分析self._synchronized_on(event)
    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
  • 这里如果self._isDisposed || self._stopped成立就会返回一个空的集合,也就没有序列的响应

  • 在.completed, .error都会改变状态self._stopped = true,也就是说序列完成或者错误之后都无法再次响应了

  • 在.completed, .error还会移除添加在集合里面的内容

  • 根据事件状态,重点看返回观察者 observers,继续dispatch函数分析

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
  • bag._value0?(event)首先执行事件的回调

  • 判断bag._onlyFastPath的情况,默认为快速通道!

  • 如果是开启慢速通道,需要从pairs,dictionary中依次处理外界事件回调.

小结

相对普通序列,subject 把订阅流程和响应流程都内部实现了,所以也就没有必要引入sink!

Subject分类

PublishSubject

初始化简单(不带默认值),并且它只会向订阅者发送在订阅之后才接收到的元素。

        let publishSub = PublishSubject<Int>()
        //发送
        publishSub.onNext(1)
        //订阅
        publishSub.subscribe{
            print("订阅到了:",$0)
        }
        .disposed(by: disposbag)
        // 再次发送响应
        publishSub.onNext(2)
        publishSub.onNext(3)
  • 信号:1 是无法被订阅的,只接受订阅之后的响应.

BehaviorSubject

通过一个默认初始值来创建,当订阅者订阅BehaviorSubject时,会收到订阅后Subject上一个发出的Event,如果还没有收到任何数据,会发出一个默认值。之后就和PublishSubject一样,正常接收新的事件。

和publish 稍微不同就是behavior有个存储功能:存储上一次的信号.

        let behaviorSub = BehaviorSubject.init(value: 100)
        // 2:发送信号
        behaviorSub.onNext(2)
        behaviorSub.onNext(3)
        // 3:订阅序列
        behaviorSub.subscribe{ print("订阅到了:",$0)}
            .disposed(by: disposbag)
        // 再次发送
        behaviorSub.onNext(4)
        behaviorSub.onNext(5)
        // 再次订阅
        behaviorSub.subscribe{ print("订阅到了:",$0)}
            .disposed(by: disposbag)

源码分析

//初始化
    public init(value: Element) {
        self._element = value
    }
//事件响应
    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._stoppedEvent != nil || self._isDisposed {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self._element = element
        case .error, .completed:
            self._stoppedEvent = event
        }
        
        return self._observers
    }
  • 初始化的时候带有一个属性self._element保存一个信号

  • 事件响应:新事件会覆盖原来的事件

ReplaySubject

相对于BehaviorSubject,只是存储能力的不同,ReplaySubject可以自定义个数.

        let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
        // 2:发送信号
        replaySub.onNext(1)
        replaySub.onNext(2)
        replaySub.onNext(3)
        replaySub.onNext(4)
        // 3:订阅序列
        replaySub.subscribe{
            print("订阅到了:",$0)
        }
        .disposed(by: disposbag)
        // 再次发送
        replaySub.onNext(7)
        replaySub.onNext(8)
        replaySub.onNext(9)

AsyncSubject

AsyncSubject只发送由源Observable发送的最后一个事件,并且只在源Observable完成之后。(如果源Observable没有发送任何值,AsyncSubject也不会发送任何值。)

        let asynSub = AsyncSubject<Int>.init()
        // 2:发送信号
        asynSub.onNext(1)
        asynSub.onNext(2)
        // 3:订阅序列
        asynSub.subscribe
            { print("订阅到了:",$0)
                
        }
        .disposed(by: disposbag)
        // 再次发送
        asynSub.onNext(3)
        asynSub.onNext(4)
        asynSub.subscribe
            { print("订阅到了:",$0)
                
        }
        .disposed(by: disposbag)
        asynSub.onCompleted()
  • 源码响应处理
    func _synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event

            let observers = self._observers
            self._observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = self._observers
            self._observers.removeAll()

            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
    }
  • 可以很清晰的看出,普通Next事件都是元素的替换,根本没有响应出来. complete事件发送到时候,就会把最新保存的self._lastElement当成事件值传出去,响应.next(lastElement)

  • 如果没有保存事件就发送完成事件:.completed

  • error事件会移空整个响应集合:self._observers.removeAll()

BehaviorRelay

  • 可以储存一个信号

  • 随时订阅响应

  • 响应发送的时候要注意为, .accept()

        let behaviorRelay = BehaviorRelay(value: 100)
        behaviorRelay.subscribe { (result) in
            print(result)
        }
        .disposed(by: disposbag)
        print("打印:\(behaviorRelay.value)")
        behaviorRelay.accept(1000)

源码

public final class BehaviorRelay<Element>: ObservableType {
    private let _subject: BehaviorSubject<Element>

    /// Accepts `event` and emits it to subscribers
    public func accept(_ event: Element) {
        self._subject.onNext(event)
    }

    /// Initializes 
    public init(value: Element) {
        self._subject = BehaviorSubject(value: value)
    }
   }
  • 内部封装了BehaviorSubject,非常明显.