RxSwift核心之调度器Schedulers

651 阅读4分钟

Schedulers 是RxSwift实现多线程的核心模块。它主要用于控制任务在哪个线程或队列运行。

小伙伴在平时的开发过程中,肯定都使用过网络请求,网络请求是在后台执行的,获取到数据之后,再在主线程更新UI。

来一段网络请求伪代码。

DispatchQueue.global(qos: .userInitiated).async {
    
    let data = try? Data(contentsOf: url)
    
    DispatchQueue.main.async {
        // 更新UI
    }
}

如果用RxSwift来实现上面的网络请求,则大致是这样的:

let rxData: Observable<Data> = ...

rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self](data) in
        // 更新UI
    })
    .disposed(by: disposeBag)

说明:

  1. 我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。 在上面的例子中,由于获取 Data 需要花费很长的时间,所以用 subsribeOn 切换到 后台Scheduler 来获取 Data 。这样就可以避免阻塞主线程。
  2. 我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。 在上面的例子中,通过 observerOn 方法切换到主线程来监听并处理结果。

下面👇介绍一下RxSwift中的几种调度器

MainScheduler

MainScheduler 代表 主线程。如果需要执行和UI相关的任务,就需要切换到该 Scheduler 运行。

查看其源码:

可以清晰的知道,在初始化时,在 MainScheduler 对象的内部,绑定了主队列 DispatchQueue.main

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler 抽象了 串行DispatchQueue。如果需要执行一些串行任务,可以切换到这个 Scheduler 执行。

查看其源码:

在初始化 SerialDispatchQueueScheduler 对象时,需要传入一个 DispatchQueue,保存在 self.configuration 结构体中。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler 抽象了 并行DispatchQueue。如果需要执行一些并发任务,可以切换到这个 Scheduler执行。

查看其源码:

ConcurrentDispatchQueueSchedulerSerialDispatchQueueScheduler 思路是一样的。也是在初始化时使用 self.configuration 保存了传入的 DispatchQueue 对象。

OperationQueueScheduler

OperationQueueScheduler 抽象了 NSOperationQueue。它具备一些 NSOperationQueue 的特点。例如,可以通过设置 maxConcurrentOperationCount 来控制同时执行并发任务的最大数量。

查看其源码:

在初始化 OperationQueueScheduler 对象时,需要传入 OperationQueue优先级queuePriority,作为初始化参数。

Scheduler的调度执行

从上一小节的几种调度器的源码可以发现,所有的调度器 Scheduler 都继承自 ImmediateSchedulerType 协议。

而这个协议只声明了一个 schedule 方法,而通过注释可以知道,在调度器调度执行的时候都会调用这个 schedule 方法。

SerialDispatchQueueScheduler 调度器为例:

public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
    return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}

以上四个方法,都是 SerialDispatchQueueScheduler 调度器中的调度方法。

分析以上方法会发现,最终都会调用 self.configuration 的某个方法。而且查看几种调度器的源码可以知道,Scheduler 中都有一个重要的属性 let configuration: DispatchQueueConfiguration。其中保存了我们需要的队列和leeway信息。

那么,我们就来分析 DispatchQueueConfiguration 中的方法。

首先分析 schedule 方法,虽然 schedule 方法中只有寥寥几句代码,但是也清晰的展示其 核心逻辑就是在当前队列下面,异步调度执行了闭包 action(state)

我们再来看其他方法:

func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
    let deadline = DispatchTime.now() + dispatchInterval(dueTime)
    
    let compositeDisposable = CompositeDisposable()
    
    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: deadline, leeway: self.leeway)
    
    // 因篇幅原因,省略部分代码 ...
    timer.setEventHandler(handler: {
        if compositeDisposable.isDisposed {
            return
        }
        _ = compositeDisposable.insert(action(state))
        cancelTimer.dispose()
    })
    timer.resume()
    
    _ = compositeDisposable.insert(cancelTimer)
    
    return compositeDisposable
}
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
    let initial = DispatchTime.now() + dispatchInterval(startAfter)
    
    var timerState = state
    
    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
    
    // 因篇幅原因,省略部分代码 ...
    
    timer.setEventHandler(handler: {
        if cancelTimer.isDisposed {
            return
        }
        timerState = action(timerState)
    })
    timer.resume()
    
    return cancelTimer
}

以上两个方法中虽然没有直接在当前队列中异步调用闭包,但是创建 timer 时,却是在当前队列中创建的,因此 timer 回调时也是在当前队列执行 eventHandler,间接实现当前队列下的调度。

以上则是调度器 Scheduler 的介绍及简单的调度分析,若有不足之处,请评论指正。

附图解一张