RxSwift核心逻辑01 -- Observable Observer

205 阅读8分钟

思想

“一千个厨师做鱼香肉丝,会做出一千种味道”——杜甫
做鱼香肉丝的工序是固定的,但是随着条件不同则会有不同的结果,甚至出现制作失败的情况(error)。
这个过程用用RxSwift写的话基本是下面这个样子:

        let 切菜 = 工序<Any>.create { (厨师) -> 收拾厨房的人 in
            切菜();
            厨师.做.给下一个人("切好的菜");
            厨师.做.完成();
            厨师.做.失败("没有刀!");
            return 收拾厨房的人.create();
        }

        let  炒菜 = 工序<Any>.create { (厨师) -> 收拾厨房的人 in
            炒菜();
            厨师.做.给下一个人("炒好的菜");
            厨师.做.完成();
            厨师.做.失败("没有锅!");
            return 收拾厨房的人.create();
        }
        
       let 鱼香肉丝 = 切菜.flatMap{(Any) -> 工序<Any> in 
               return 炒菜;
        };
        
        鱼香肉丝.制作(给下一个人: { (半成品) in
            装盘(未装盘的菜);
        }, 完成: {
             print(这个菜抄完了);
        }, 失败: {(error) in 
             print(这个菜没做完是因为:\(error));
            // 这个菜没做完是因为:没有刀 or 没有锅
        });

这里我们可以知道RxSwift的一个基本思路是,创建一个任务,创建这个任务的时候要把任务的结果、是否完成 以及 善后考虑好。然后在要做这个任务的时候,直接开启这个任务。就是在开启这个任务的时候,内部会自动分配一个匿名的执行者去执行这个任务。
这里面有3个主要成员:菜,厨师,收拾厨房的人。

然后看一段简单的正宗RxSwift调用
代码1.1:

        let requestTask = Observable<Any>.create { (obserber) -> Disposable in
            obserber.on(.next("返回数据1"));
            obserber.onNext("返回数据2");
            return Disposables.create();
        }
        
        
        let _ = requestTask.subscribe(onNext: { (text) in
            print("执行结果:\(text)");
        });

结构

这里面有三个核心


图1

*注:该图非官方,是笔者自己画的
绿色:Swift语言中的extension。
红色:枚举类型
蓝色:结构体

通过图1我们可以看到RxSwift里面主要有三个核心协议,即<ObservableConvertibleType>的子类<ObservableType>,<ObserverType>和< Disposable >。用通俗的话讲就是“需要干的活的协议”,“干活的人的协议” 以及 “善后的人的协议”。

是不是被这么多类吓到了?其实稍微划分一下就很明朗了:


图2

总共有五大部分,每个部分都有一个主要的类(用红框标出的)。
下面我们一个一个的来细细分析

注意:请一定要看清楚是observable还是observer
注意:请一定要看清楚是observable还是observer
注意:请一定要看清楚是observable还是observer

分析

0 - asXXXX

整个Rx内部有着大量的asXXX的方法,这里放在前面统一来说:
随着不停地子类化,到后面履行该协议的类可能不止一个身份,也会有其他身份。为了可读性和逻辑性这里才有了这个方法协议。
举个例子:小明原先是程序员但是他转行成了一名厨师,所以小明现在的身份是一名厨师,厨师敲代码这很变扭但是:

小明.as程序员.敲代码()

就很合理了。

1 - Event

通过箭头的指向的个数可以看出这个枚举是一个核心。这个枚举表示了一个事件的三种情况,即next, complete, error。这里面next和error都绑定了关联值。

2.1 -

“干活的人”的基本协议。

  1. func on(_ event: Event<E>)
    作为一个响应者(干活的人)需要遵守的最基本协议,这里面只包含了这一个方法,而参数则是上面我们提到的Event。

  2. public func onNext(_ element: E) ,public func onCompleted() ,public func onError(_ error: Swift.Error)
    对于2.1.1的扩展,通过源码我们可以知道仅仅是在发送相应的枚举值。并且在onNext和onError的时候,把参数赋值到枚举的关联值上。

2.2 - public struct AnyObserver

作为本篇唯一的结构体,之所以用结构体是因为它不需要被继承而且无需做析构处理。这里只有一个属性:(Event<Element>) -> Void类型的observer。

  1. public init<O : ObserverType>(_ observer: O) where O.E == Element
    这个方法的实现是:
self.observer = observer.on

这里非常巧妙,因为on方法的方法原型和observer的闭包类型都是

 (Event<Element>) -> Void

所以是可以赋值的。

  1. public func on(_ event: Event<Element>)
    这里实现了协议的on方法,就是以event为参数调用自己的闭包对象observer。

3 - final class AnonymousObserver

基本属于2.2的加强版,因为不是结构体而是类,所以有了析构处理,而且本身也是继承自class ObserverBase<ElementType> 类。这里基本逻辑和2.2基本类似,唯一不同的是他多了<Disposable>协议的实现,我们会在后面的文章中专门讲<Disposable>的,这里不做过多赘述。

4.1 - < ObservableType >

“需要干的活”的基本协议。

  1. func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
    -- 作为所有Observable的最根本的协议,当然包含了最基本的功能subscribe。用上面做菜的例子说就是“一道菜(Observable)最起码是能被一个厨子(Observer)制作(subscribe)出来的,并且返回一个收拾厨房的人(Disposable),在适当的时候收拾残局”

  2. public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>
    -- 该协议扩展的简单工厂方法,通过一个闭包来快速创建一个AnonymousObservable类的实例。(AnonymousObservable后面会讲到)

  3. public func subscribe(_ on: @escaping (Event<E>) -> Void) -> Disposable
    -- 对于方法1的扩展,通过一个(Event<E>) -> Void的逃逸闭包作为参数,来创造一个实现了ObserverType的类AnonymousObserver的实例,然后调用方法1。

  4. public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable
    -- 看似很复杂,但是其实真正的核心代码只有中间的AnonymousObserver创建,就是把作为参数的onNext,onError,onCompleted的闭包在对应枚举值的情况下传入关联值并且回调。而onDisposed闭包则是用来创建对一个的disposable,在适当的时候调用disposable.dispose()。在最后创建Disposables时,将先前的AnonymousObserver的对象作为参数调用自己的subscribe方法。

4.2 - class Producer

  1. func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element
    -- 新添加的抽象方法。通过一个ObserverType的子类的对象和一个Cancelable(Disposable的子类)的对象为参数,返回两个Disposable的对象。

  2. override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element
    -- 协议方法的具体实现,里要注意的是下面这段代码:

let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)

这个方法自己生成了一个disposer,然后把这个disposer和observer作为参数调用了一下自己的run方法。

4.3 - final private class AnonymousObservable

  1. _subscribeHandler 和 init

这个final类添加了一个属性

let _subscribeHandler: (AnyObserver<Element>) -> Disposable

而构造方法则是把这个属性赋值

init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
}

再结合4.1.2我们就可以知道这个_subscribeHandler,就是我们调用creat的时候传过来的block。也就是代码1.1中的

obserber.on(.next("返回数据1"));
obserber.onNext("返回数据2");
return Disposables.create();
  1. override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element
    -- 注意这两段代码:
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)

说明了两点:
1) AnonymousObservable的run方法其实是调用的AnonymousObservableSink的run方法。
2)第一行说明了AnonymousObservableSink类的实例对象中可能包含了observer以及dispose。第二行说明了AnonymousObservableSink类的实例对象中又可能包含了observeable。
换句话说AnonymousObservableSink类是上面提及到的<Observer Type>,< ObservableType > 以及后面会提及的< Disposable >的汇合点。他是把“要干的活”,“干活的人”以及“处理者”给汇合到一起并且协调的那个枢纽

5. - final private class AnonymousObservableSink

当大家看到这个类的两个方法run和on的时候,有没有感觉前面的一切都串联到了这里。因为run是Observable才有的方法,而on则是Observer才有的方法,这里一个类就包含了这两个方法,足以说明这个类作为枢纽的身份。

  1. func on(_ event: Event<E>)
    --通过父类的方法我们可以看出来,这个方法本质是调用observer的on,但是会根据上下文来适当的调用dispose。

  2. func run(_ parent: Parent) -> Disposable
    --其实可以拆成两行代码

let anyObserver = AnyObserver(self);
parent._subscribeHandler(observer);

大家注意,这里的anyObserver和自身属性的observer是有区别的。自身的observer是AnonymousObserver类(3)的对象。而anyObserver则是结构体AnyObserver(2.2)的对象。(具体区别在前边讲AnonymousObserver类的时候有提到)。
这两句代码就是生成一个AnyObserver的对象并且以其作为参数调用上面的_subscribeHandler闭包。
还是拿代码1.1为例。

        let requestTask = Observable<Any>.create { (obserber) -> Disposable in
            obserber.on(.next("返回数据1"));
            obserber.onNext("返回数据2");
            return Disposables.create();
        }

这里的obserber就是AnyObserver(self)。

总结

整体RxSwift的核心很绕也很迷人,其实只要记住
Obserable - subscribe(Observer) - run(Observer);
⬆️
on(Event)---Sink-------run(AnyObserver)
⬆️
Observer - on(Event);
并且结合源码和UML图还是相对容易理清思路的。


作为一个开发者,有一个学习的氛围和一个交流圈子特别重要,这是我的交流群(111),大家有兴趣可以进群里一起交流学习

收录:原文地址



如果觉得写得不错 给我一个赞谢谢