走进 RxSwift 之冷暖自知

2,795 阅读10分钟

瞎扯几句

前段时间身体跟心态都出了点问题,博客也很久没更新了。细心的朋友可能发现我的个人介绍换了,由原先高冷装逼的“谢绝转载”变为略显矫情的“人生谁能不迷茫”了。不知道大家有没有这样的经历,因为一些三言两语难以说清的理由,或者干脆就是无端地对代码产生了一些排斥情绪,下班后看不进书也不想碰代码。我经历了几天这样的日子,挺难受的,好在很快就走出来了。编程本身其实是件有趣的事,但任何事情,一旦将其作为职业,便也失了纯粹,总会有身不由己的时候。做个程序猿,也是如人饮水,冷暖自知。

关于标题

言归正传啊,今天还是想跟大家聊一聊 RxSwift ,之前我写过一篇 走进 RxSwift 之观察者模式,讲解了 RxSwift 的部分实现。今天这个标题还是以“走进 RxSwift”为开头,暗示着这将会是一个系列(感觉立了个 Flag 啊……)。至于冷暖自知呢,就有一点讲究了,可不仅仅是一句感慨。同为 FRP 框架的 RAC 中素有冷信号和热信号的概念,而且是两种不同的类型。其实 RxSwift 中的 Observable 也有冷热之分,但为什么提的人不多呢,官方文档是这么说的:

IMHO, I would suggest to more think of this as property of sequences and not separate types because they are represented by the same abstraction that fits them perfectly, Observable sequence.

RxSwift 认为不管是 Cold 还是 Hot,它们都是 Observable,它们同属于一个抽象,而不是两种独立的类型。这个观点很有意思,也就是说如果你自己声明了一个 Observable(遵守 ObservableType 协议),使用者并不知道它是 Cold 还是 Hot,只有你自己清楚,真正的“冷暖自知”。

使用场景

关于冷热 Observable 的异同,跟冷热信号的异同是类似的,网上已经有很多资料了,我就不详细展开了。简而言之,Cold Observable(以下简称 CO)只有在被订阅的时候才会发射事件,每次有新的订阅者都会把之前所有的事件都重新发射一遍; Hot Observable(以下简称 HO)则是实时的,一旦有新的事件它就发射,不管有没有被订阅,而新的订阅者并不会接收到订阅前已经发射过的事件。

一个姑娘,不管她冷若冰霜还是热情似火,总有人喜欢(毕竟只要漂亮就好0 0)。Observable 也是一样,冷热只是其特性,并没有优劣之分,它们都有各自的应用场景。HO 有点“推模型”的意思,它会 push 新的事件过来,一些实时性要求较高的场景(譬如各种响应事件、通知消息等),如果你要自己用 Rx 去封装的话可以用 HO(当然这些 RxCocoa 基本都帮你做了,而且严格来说也不算用了 HO,这个下文再叙)。而 CO 则有点“拉模型”的意思,只在需要的时候去 pull(subscribe),所以在封装网络请求和一些异步操作的时候,可以使用 CO。

实战小剧场

下面我随便举个例子,我们要完成的功能是发送网络请求,将返回的数据显示到一个 TableView 上。假设我们已经有了一个网络模块,它的调用接口大概长这样:

class Resource {
    typealias CompletionHandler = (Data?, Error?) -> Void

    func request(completion: CompletionHandler) {
        // ...
    }
}

这是一个非常典型的网络请求,在回调中处理返回数据。不过我们想用 Rx 对它进行一点小包装,让它直接返回一个 Observable。记得我前面说过吧,异步操作一般是用 CO:

extension Resource {
    func request() -> Observable {
        return Observable.create { observer in
            self.request() { (data, error) in
                if let error = error {
                    observer.onError(error)
                    return
                }
                if let data = data {
                    observer.onNext(data)
                }
                observer.onCompleted()
            }

            return Disposables.create()
        }
    }
}

顺便说一下,用Observable.createObservable.justObservable.empty等方法创建的一般都是 CO。好的,现在我们可以这样调用了:

let bag = DisposeBag()
let testResource = Resource()

testResource.request()
    .map(parse)
    .subscribe { print($0) }
    .addDisposableTo(bag)

为了便于测试,造点假数据打印一下:

func request(completion: CompletionHandler) {
    // test
    let data = Data()
    completion(data, nil)
}

func parse(data: Data) -> String { return "Test Data" }

这时顺利打印出了如下内容:

next(Test Data)
completed

到此为止网络请求已经没有问题了,现在只要把数据显示到 TableView 上就好了,我们稍加修改:

func parse(data: Data) -> [String] { return ["Test Data"] }

testResource.request()
    .map(parse)
    .bindTo(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
        cell.textLabel?.text = element
    }
    .addDisposableTo(bag)

好的,我们已经圆满完成了任务。然而生活永远不会这么一帆风顺,在你准备收拾收拾下班的时候,产品经理突然过来说,兄弟,我昨天喝多了脑子不灵清,忘了跟你说我们这个页面是要能刷新的……OK,不就是刷新么,好说,这就给你加上。于是你把请求数据并显示的这段代码放到一个函数里。但这时候你开始纠结了,函数名……该叫什么呢?fetchData?可这玩意儿不仅去查询了数据,还展示了数据啊,咦?说好的一个函数只做一件事呢……哎不管了,还要回家遛狗呢,先完成功能再说吧,于是你机智地写下了一个bindDataSource函数,在加载视图和用户下拉刷新的时候都调用这个函数:

func bindDataSource() {
    Resource().request()
        .map(parse)
        .bindTo(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
            cell.textLabel?.text = element
        }
        .addDisposableTo(bag)
}

你觉得万无一失,潇洒地按下 cmd + R,准备看一眼效果就走人。然而世事难料,有时在残酷的现实面前你不得不低下你高傲的头颅,你低头看了眼屏幕,发现在下拉的时候,触发了一个断言:

"Hint: Maybe delegate was already set in xib or storyboard and now it's being overwritten in code.\n")

这很好理解,应该是刷新的时候重复绑定 data source 了,显然只要在每次 bind 之前把 tableView 的 dataSource 置 nil 就行了。于是你在bindDataSource开头加了一句tableView.dataSource = nil,再次 cmd + R。Everything goes well!一切都在意料之中,你捋了捋一丝不乱的刘海,合上电脑回了家。

但这晚注定是个不平静的夜晚,你躺在床上辗转反侧,总觉得哪里不对劲。你的脑海中一直盘旋着tableView.dataSource = nil这句代码,为什么每次刷新都需要重新绑定 dataSource?这什么套路,怎么这么不按常理出牌?我白天的时候在想什么?就这样你陷入了无尽的自我拷问之中,你看了眼身旁熟睡的姑娘,起来蹑手蹑脚地打开了电脑,把代码改成了这样:

let response: Variable<[string]> = Variable([])

func fetchData() {
    Resource().request()
        .map(parse)
        .bindTo(response)
        .addDisposableTo(disposeBag)
}

func bindDataSource() {
    response.asDriver()
        .drive(tableView.rx.items(cellIdentifier: "Cell", cellType: UITableViewCell.self)) { (_, element, cell) in
            cell.textLabel?.text = element
        }
        .addDisposableTo(disposeBag)
}

在页面加载的时候调用fetchDatabindDataSource,而每次刷新页面的时候,调用fetchData就可以了。response既是 Observer 也是 Observable,它作为 Observer 订阅了网络数据的变化,每次一有新的数据,就发送新的事件,tableView 随之更新。很明显,Variable 是个 HO,它其实是 BehaviorSubject 的一个封装,顺便说一下,RxSwift 中的所有 Subject 都是 HO。

如何一眼区别 CO 和 HO

前面也说过了,不管是冷是热,它们都是 Observable,虽然我顺便提了几个 RxSwift 中典型的 CO 和 HO,但如果是一个自定义的 Observable 呢,要如何区分它是冷是热呢?嗯,看源码啊……毕竟源码面前没有秘密嘛。不过看源码也有点讲究,对于今天这个话题,其实只要看它实现的subscribe方法就可以了。之前我在 走进 RxSwift 之观察者模式中解释了Observable.empty的实现,虽然那是比较老的版本,不过核心原理是一样的。默认的 Observable 的实现是冷的,像之前例子中我用Observable.create创建了一个Observable实例,create方法的参数是一个闭包,这个闭包被赋值给一个属性,每当这个实例被订阅的时候,闭包就会被执行。我之前试着实现过一个简化版的 Rx 模型,可以用create方法创建一个 CO,用 Swift2.2 写的,大家可以稍微看下,领会精神:)

enum Event {
    case Next(T)
    case Completed
    case Error(ErrorType)
}

typealias CompletedHandler = () -> Void
typealias ErrorHandler = ErrorType -> Void

class Observable {
    typealias SubscribeHandler = Observer -> Void

    private let subscribeHandler: SubscribeHandler
    private init(handler: SubscribeHandler) {
        subscribeHandler = handler
    }

    static func create(subscribe: Observer -> Void) -> Observable {
        return Observable(handler: subscribe)
    }

    func subscribe(observer: Observer) {
        subscribeHandler(observer)
    }

    func subscribe(on: Event -> Void) {
        let anonymousObserver = Observer(handler: on)
        subscribe(anonymousObserver)
    }

    // todo
    // func subscribe(onNext: E -> Void) {}
    // func subscribe(onError: ErrorHandler) {}
    // func subscribe(onCompleted: CompletedHandler) {}
}

class Observer {
    typealias EventHandler = Event -> Void
    let eventHandler: EventHandler
    // todo
    // let nextHandler: E -> Void
    // let errorHandler: ErrorHandler
    // let completedHandler: CompletedHandler

    init(handler: EventHandler) {
        eventHandler = handler
    }

    func on(event: Event) {
        eventHandler(event)
    }

    // todo
    func onNext(element: E) {}
    func onCompleted() {}
    func onError(error: ErrorType) {}
}


let observable = Observable.create { (observer: Observer) in
    observer.on(.Next("Value"))
    observer.on(.Completed)
}

// Mark: Cold
observable.subscribe { event in
    switch event {
    case .Next(let value):
        print(value)
    case .Error(let error):
        print(error)
    case .Completed:
        print("Completed")
    }
}

observable.subscribe { event in
    print(event)
}

这段代码在 Xcode7 是能正常跑起来的,调用起来跟 RxSwift 也没什么不同,输出也没问题。大家想必也发现了,CO 一般是无状态的,它不会去维护一堆 Observers 或者一堆 Events 什么的,它就是一堆函数(或者说闭包),在被订阅的时候被调用,所以 CO 是比较符合 FP 的思想的。而 HO 呢,恰恰相反,我们看一下一个典型的 HO——PublishSubjectsubscribe方法:

public override func subscribe(_ observer: O) -> Disposable where O.E == Element {
    _lock.lock(); defer { _lock.unlock() }
    return _synchronized_subscribe(observer)
}

func _synchronized_subscribe(_ observer: O) -> Disposable where O.E == E {
    if let stoppedEvent = _stoppedEvent {
        observer.on(stoppedEvent)
        return Disposables.create()
    }

    if _isDisposed {
        observer.on(.error(RxError.disposed(object: self)))
        return Disposables.create()
    }

    let key = _observers.insert(observer.asObserver())
    return SubscriptionDisposable(owner: self, key: key)
}

这段代码看着复杂,但它的核心其实就一句:

let key = _observers.insert(observer.asObserver())

也就是把当前的订阅者加到一个订阅者集合中,而当有新的事件时,就发送给集合中所有的订阅者:

public func on(_ event: Event) {
    _lock.lock(); defer { _lock.unlock() }
    _synchronized_on(event)
}

func _synchronized_on(_ event: Event) {
    switch event {
    case .next(_):
        if _isDisposed || _stopped {
            return
        }

        _observers.on(event)
    case .completed, .error:
        if _stoppedEvent == nil {
            _stoppedEvent = event
            _stopped = true
            _observers.on(event)
            _observers.removeAll()
        }
    }
}

同样,这段代码的核心是:

_observers.on(event)

_observers的类型并不是 Swift 原生的某种集合类型,可能是出于性能考虑,RxSwift 定义了一个叫Bag的数据结构,但原理上是一样的。这个on方法就是给每个订阅者发送事件。

由上可得,HO 其实是比较典型的观察者模式,跟 target-action 啊 NSNotificationCenter 啊等等的实现原理是差不多的,都需要维护一个观察者集合。RxCocoa 为我们封装了各种事件响应,按理说应该是用 HO,但我看了代码发现并非如此:

public func controlEvent(_ controlEvents: UIControlEvents) -> ControlEvent {
    let source: Observable = Observable.create { [weak control = self.base] observer in
        MainScheduler.ensureExecutingOnScheduler()

        guard let control = control else {
            observer.on(.completed)
            return Disposables.create()
        }

        let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) {
            control in
            observer.on(.next())
        }

        return Disposables.create(with: controlTarget.dispose)
        }.takeUntil(deallocated)

    return ControlEvent(events: source)
}

这个controlEvent方法是非常关键的,像 UIButton 的 rx.tap 其实就是调用了这个方法,我们发现这里是用create方法创建了一个 Observable,ControlEvent 其实是个壳而已,真正工作的还是这个source,它只是保证让操作发生在主线程:

public struct ControlEvent : ControlEventType {
    public typealias E = PropertyType

    let _events: Observable

    public init(events: Ev) where Ev.E == E {
        _events = events.subscribeOn(ConcurrentMainScheduler.instance)
    }

    public func subscribe(_ observer: O) -> Disposable where O.E == E {
        return _events.subscribe(observer)
    }
    // ...
}

看这个subscribe,就是调用了之前create出来的sourcesubscribe而已,所以它也显然是个 CO。这就有点尴尬了……这明显是个用 HO 的场景啊……让我们回头看,玄机其实在这一句:

let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) {
    control in
    observer.on(.next())
}

让我们看下ControlTarget是哪条道上的,看一下初始化函数:

init(control: Control, controlEvents: UIControlEvents, callback: @escaping Callback) {
    MainScheduler.ensureExecutingOnScheduler()

    self.control = control
    self.controlEvents = controlEvents
    self.callback = callback

    super.init()

    control.addTarget(self, action: selector, for: controlEvents)

    let method = self.method(for: selector)
    if method == nil {
        rxFatalError("Can't find method")
    }
}

我们看重点这一句:

control.addTarget(self, action: selector, for: controlEvents)

是不是很眼熟?没错,这就是我们平常用的那个addTarget。所以 UIKit 已经有在维护一个观察者集合了,本身已经是“热”的了,Rx 就没必要再去加把火了了。

最后

嗯……本期大致就是如此了,一些细枝末节我就不再赘述了,有兴趣的朋友大可以自己去看看源码。最后,祝您身体健康,生活愉快,再见!