RxSwift(八)调度者Schedulers 核心逻辑解析

964 阅读7分钟

前言

调度者(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。本文将探索Schedulers是如何控制线程或队列的过程的实现原理,篇幅较长,感兴趣的同学可以一步步跟着探索下去。

提出疑问

如下代码,在异步队列子线程执行rx的点击监听,然后打印当前线程,竟然打印出来的是点击了按钮 --- <NSThread: 0x600000c2d980>{number = 1, name = main}当前线程是主线程,这是怎么做到的呢?

DispatchQueue.global().async {
    self.actionBtn.rx.tap
        .subscribe(onNext: { () in
            print("点击了按钮 --- \(Thread.current)")
        })
        .disposed(by: self.bag)
}

开始探索

一样的按住cmd点击进入tap里,可以看到返回的是controlEvent(.touchUpInside),再进去看controlEvent

public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
    let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
            MainScheduler.ensureRunningOnMainThread()
            //省略
}
            
  public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
        #if !os(Linux) // isMainThread is not implemented in Linux Foundation
            guard Thread.isMainThread else {
                rxFatalError(errorMessage ?? "Running on background thread.")
            }
        #endif
    }

可以发现这里有判断必须要在主线程执行,但是还没看到调度的代码,然后发现上面的第一个方法里返回的是一个ControlEvent,进入ControlEvent看一下

 public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element {
        self._events = events.subscribeOn(ConcurrentMainScheduler.instance)
    }

可以看到subscribeOn订阅在ConcurrentMainScheduler.instance主线程上,这时可以清楚的明白是切换到了主线程,不过还是需要明白subscribeOn到底干了些什么呢?

final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

到这里很明确了,它是继承ObservableTypeProducer的,在前几篇文章分析过的RxSwift的核心逻辑,其实这里切换了一下,整个的过程是: 订阅序列->创建observer->Producer里的SubscribeOnrun->SubscribeOnSink.runon->调用observer.on执行sinkon->sink里保存的eventHandler响应event事件->最后调用外界的subscriber的闭包

对于上面的流程不了解的可以看之前的文章,可以断点每个步骤,了解核心流程。

调度者Schedulers核心源码

写如下代码,使用observeOn(SerialDispatchQueueScheduler.init(qos: .background)),这里一般是执行串行队列的需要使用到的,里面是封装了GCD的队列 ,我们点击observeOn进去看看里面的代码

    Observable.of(1,2,3,4,5)
            .observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "observeOnSerial"))
            .subscribe{print("observeOn",$0,Thread.current)}
            .disposed(by: self.bag)
  1. 很明显,我们在外面调用的是SerialDispatchQueueScheduler,所以这里会返回一个ObserveOnSerialDispatchQueue的新序列,当然是要点进去看看
 public func observeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
    }
  1. 这里把外界传递过来的SerialDispatchQueueSchedulersource源序列都保存起来了,同时还继承了Producer,实现了run方法
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source

        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
            _ = increment(_numberOfSerialDispatchQueueObservables)
        #endif
    }

    //省略
}
  1. 外界调用subscribe时,就会走这里的run方法(我这里省略了走到这里的原因,因为在之前探索核心逻辑已经探索的很清楚了) 我们看到,self.source.subscribe(sink)是订阅源序列,然后传递一个ObserveOnSerialDispatchQueueSink的管子进去

     override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
    
  2. cmd然后点击self.source.subscribe,弹出一堆可进入的方法,这里肯定是会执行Producersubscribe,所以选择它进入,可以在这处打个断点确认一下。

  3. 看到这里CurrentThreadScheduler.isScheduleRequired,根据这个标识符CurrentThreadScheduler.isScheduleRequired去找执行的方法

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    if !CurrentThreadScheduler.isScheduleRequired {   // 后续走这里
        let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
        return disposer
    }
    else {    // 第一次走这里
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
    }
}
  1. 我们点进CurrentThreadScheduler.instance.schedule,可以看到其实这里只是多了一个标记isScheduleRequiredfalse的步骤
    public func schedule<StateType>(_ state: action: ) -> Disposable {

    if CurrentThreadScheduler.isScheduleRequired {
      // 已经标记,就置false
        CurrentThreadScheduler.isScheduleRequired = false
     // 外界闭包调用执行
        let disposable = action(state)
      // 延迟销毁 
        defer {
            CurrentThreadScheduler.isScheduleRequired = true
            CurrentThreadScheduler.queue = nil
        }
      // 先省略。。。
        return disposable
    }
     // 先省略。。。
    return scheduledItem
}
  1. 再看到self.run(observer, cancel: disposer),点击进去,选择ObservableSequence,又会回到第三步的run方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
       let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
       let subscription = sink.run()
       return (sink: sink, subscription: subscription)
   }

点击进入到 ObservableSequenceSink->run

  func run() -> Disposable {
       return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
           var mutableIterator = iterator
           if let next = mutableIterator.next() {
               self.forwardOn(.next(next))
               recurse(mutableIterator)
           }
           else {
               self.forwardOn(.completed)
               self.dispose()
           }
       }
   }

继续点击进入ImmediateSchedulerType里的scheduleRecursive,在这里可以看到,创建了RecursiveImmediateScheduler,来保存外界传递过来的闭包,而且调用了recursiveScheduler.schedule(state)并执行闭包

public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
       let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
       
       recursiveScheduler.schedule(state)
       
       return Disposables.create(with: recursiveScheduler.dispose)
   }
  1. 点击进入recursiveScheduler.schedule(state),在这个递归调度者里
  • 这里因为在递归环境,加了一把锁递归锁,保障安全
  • 通过保护,获取action执行,也就是外界传给递归调度者的闭包任务
  • 因为加了一把递归锁,所以保证了RxSwift的调度是有顺序的
func schedule(_ state: State) {
   var scheduleState: ScheduleState = .initial
   let d = self._scheduler.schedule(state) { state -> Disposable in     
       // 这里因为在递归环境,加了一把锁递归锁,保障安全   
       let action = self._lock.calculateLocked { () -> Action? in
                return self._action
       }
       
       if let action = action {
           action(state, self.schedule)
       }
       
       return Disposables.create()
   }
 // 篇幅,先省略,大家自行查阅  
}
  1. 点击进入self._scheduler.schedule(state),之前我们在外界使用的是SerialDispatchQueueScheduler,所以进入它里面的schedule
 public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
       return self.scheduleInternal(state, action: action)
   }

self.scheduleInternal(state, action: action)-> scheduleInternal->self.configuration.schedule,可以看到如下代码,这里异步执行了cancel.setDisposable(action(state))

func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
       let cancel = SingleAssignmentDisposable()

       self.queue.async {
           if cancel.isDisposed {
               return
           }
           cancel.setDisposable(action(state))
       }

       return cancel
   }

其实这里执行的就是第8步这里的RecursiveImmediateScheduler里的

 if let action = action {
           action(state, self.schedule)
       }
  1. 再回到第7步ObservableSequenceSink里的run,上面的action调用尾随闭包,即调用了self.forwardOn,点进去发现其实调用的就是self._observer.on. 这里在之前的核心逻辑分析过
     func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
  1. 然后会调用到第三步里创建的ObserveOnSerialDispatchQueueSinkonCore
        override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }
    
    看到这里发现很熟悉,继续点进去
      public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }
    
    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }
    
    最终又来到了这里,在当前队列异步执行任务,调用action(state), 所以,我们在最外界调用on(Event)响应的时候,就会在外界指定的schedule里执行闭包了
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
    
        self.queue.async {
            if cancel.isDisposed {
                return
            }
            cancel.setDisposable(action(state))
        }
    
        return cancel
    }
    

总结

其实就是创建了一个中间序列ObserveOnSerialDispatchQueueSink,继承自Producer,实现了run方法,保存了源序列传递进来的observerschedule。在外界订阅的时候会执行一套producer的run,还会执行sink.run,然后由传递进来的observer响应on(event)

所以其实就是两层序列订阅响应,第二层sink就相当于源序列的observer观察者,其实和RxSwift核心逻辑那里非常相似,只是多了个中间的序列来处理线程的调度。掌握了我之前写过的核心逻辑,其实后面都是类似的,乍看有难度,上手探索之后发现很好玩,很简单,很有意思!