RxSwift--Subject底层原理分析及应用

1,058 阅读3分钟

一、Subject概念

  Subject作为一个代理者,既可以作为Observer,又可以作为Observable。作为Observer,它可以订阅一个或多个Observable;作为Observable,它可以转发收到来自Observer的数据,也可以发送新的数据。

二、Subject分类

  根据不同的场景,一共分为四种类型的Subject:PublishSubjectBehaviorSubjectReplaySubjectAsyncSubject。下面分别对其底层原理进行分析。

1.PublishSubject

  PublishSubject只会把在订阅之后来自原始Observable的数据发送给观察者。

底层源码分析:

  • 初始化:进入PublishSubject类,初始化函数init中没做什么操作,但是我们可以找到publish函数的重写:
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

  该subscribe函数的重写,主要进行加锁、同步订阅、解锁操作。加锁保证订阅流程的安全性
(注:源码分析中,一般if let… guard let…等操作都是判断空的操作,不需要重点分析,其它部分才是关键分析部分)

  • 收集观察者回调_synchronized_subscribe函数:
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

  主要进行收集观察者回调:self._observers.insert(observer.on) ,observers为一个观察者集合。

  • 发送信号:.onNext,必然会调用父类ObserverType中的onNext函数:
public func onNext(_ element: E) {
        self.on(.next(element))
    }
  • 执行self.on函数便会进入PublishSubject的on函数:
public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

  主要进行分发当前的event:dispatch(…, event),即在_synchronized_on方法中对event进行switch分发。

  • 最终在ViewController.swift中subscribe中接收回调数据。

  发送信号过程可简单理解为:取出observers->调用observer.on方法。

  回头再理解一下PublishSubject的含义:因为订阅之前observers集合为空,空集合当然不会调用observer.on方法,也就不会在subscribe中收到回调数据。

2.BehaviorSubject

  BehaviorSubject会发送订阅之前最后一个数据给观察者。

底层源码分析:
  逻辑流程与PublishSubject大致相同,只是多了一个element变量:

private var _element: Element

  用于保存subscribe之前的最后一个event。
在这里插入图片描述

3.ReplaySubject

  ReplaySubject会发送指定个数的来自原始Observable的数据给观察者,无论它们是何时订阅的。

底层源码分析:

  • 初始化中返回了指定bufferSize大小的ReplayMany类:
    在这里插入图片描述
  • ReplayMany继承ReplayManyBase,定义了queue变量,存放指定size的event,并实现入队列、出队列操作方法,进行添加、删除event等操作。
    在这里插入图片描述
  • 在_synchronized_on函数中,在.next中调用addValueToBuffer函数添加event,.error、.completed中调用trim函数删除event。
    在这里插入图片描述

  其它响应流程类似PublishSubject。

4.AsyncSubject

  AsyncSubject只发送由源Observable发送的最后一个事件,并且值在源Observer完成(completed)之后。如果发生error,则不发送任何事件。

底层源码分析:
  流程类似PublishSubject,只是在_synchronized_on中做了处理,对于error事件,清空observers,不回调任何event。对于completed事件,只发送订阅之前最后一个event。
在这里插入图片描述

三、Subject应用

  对于Subject的应用见Github:RxSwift_Demo/RxSwift_Subject