阅读 355

RxSwift高阶函数之publish

publish 会将 Observable 转换为 可被连接的Observable可被连接的Observable 和 普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样,你就可以 Observable 在何时开始发出元素。

示例:

let netOb = Observable<String>.create { (observer) -> Disposable in
    
    print("--模拟请求网络开始--")
    sleep(2)// 模拟网络延迟
    print("--模拟网络请求成功--")
    
    observer.onNext("请求到网络数据")
    
    return Disposables.create()
    }
    .publish()

_ = netOb.subscribe({ print("1-订阅到:\($0)") })
    .disposed(by: disposeBag)

_ = netOb.subscribe({ print("2-订阅的:\($0)") })
    .disposed(by: disposeBag)

_ = netOb.connect()
复制代码
打印结果:
--模拟请求网络开始--
--模拟网络请求成功--
1-订阅到:next(请求到网络数据)
2-订阅的:next(请求到网络数据)
复制代码

分析打印结果,可以发现,虽然对序列订阅了两次,但是只发送了一次信号,并且两个观察都有响应。如果没有对 Observable 执行 publish() 和 最后的 netOb.connect() ,那么按照RxSwift核心逻辑中的分析,就会在两个观察者订阅的时候,都会发送信号,就会发送多次信号。

那么我们就来分析一下,publish 是如何让多次订阅只发送一次信号的。

查看 publish 源码:

通过源码,可以知道,publish 函数内部,调用了 multicast 函数,并且有一个尾随闭包作为函数参数。而尾随闭包中的代码块作用是初始化一个 PublishSubject 实例对象。

再查看 multicast 函数源码。

其以可观察序列和 闭包作为参数,初始化 ConnectableObservableAdapter 对象。
ConnectableObservableAdapter 初始化时仅将源序列保存在 self._source 和闭包保存在 self._makeSubject 中。

订阅信号 subscribe

对源序列执行 publish() 后,源序列变为 ConnectableObservableAdapter,再对其进行订阅,会执行 ConnectableObservableAdapter.subscribe 函数。

subscribe 函数会执行 self.lazySubject.subscribe(observer),即会对 self.lazySubject 进行订阅。 而 self.lazySubject 从命名上看就知道,这个一个懒加载属性。 因为初始化时 self._subject 为nil,所以会调用 self._makeSubject 闭包。回调结果是初始化一个 PublishSubject 的实例。

因此,self.lazySubject 其实是 PublishSubject 对象。 也即是对 PublishSubject 对象进行订阅 subscribe。 又因为 PublishSubject 仅初始化,并没有创建序列,所以,并不会发出信号。同时,由于 PublishSubject 的特性,会把 观察者 保存起来,所以并不会丢失源序列的观察者PublishSubject 具体分析请参阅RxSwift之Subject

连接 connect

在需要发送信号的时候,则执行 connect() 函数开始发送信号。查看 connect 源码。

通过查看源码 self._lock.calculateLocked 会直接执行闭包,所以会创建 Connection 对象,并对源序列 self._source 进行订阅。

根据 RxSwift核心逻辑简介 中的分析可以知道调用 self._source.subscribe(connection) 最终调用 connection.on 函数。

最后会调用 self._subjectObserver.on(event),分析可知,即调用 PublishSubject.on(event),实现对所有观察者发送信号。详细分析请参阅RxSwift之Subject

以上即为 publish 的底层分析,其内部主要使用了 PublishSubject 保存多个观察者,实现多次订阅,一次发送的目的。

上面的分析中若有不足,请评论指正。