RxSwift中的Timer

4,560 阅读4分钟

RxSwift中的Timer

我们在项目中经常会用到定时器,先来看下swift中使用定时器的几种方式:

Timer

//第一种写法
timer1 = Timer.init(timeInterval: 1,
                    target: self,
                    selector: #selector(timerFire),
                    userInfo: nil,
                    repeats: true)
RunLoop.current.add(timer1, forMode: .common)

//第二种写法
timer = Timer.scheduledTimer(withTimeInterval: 1,
                            repeats: true,
                            block: { (timer) in
    print("定时中。。。")
})
RunLoop.current.add(timer, forMode: .common)

这两种只是写法不同而已,第一种初始化的方式创建的timer需要手动添加到runloop中才能启动。第二种scheduled开头的方法,会默认把timer加入到当前runloopdefault模式下。但它们都需要改成common模式,runloop才会在滚动视图时同时也响应定时器。这个timer的生命周期以及内存泄漏风险都需要我们自己管理。

CADisplayLink

displayLink = CADisplayLink(target: self, selector: #selector(timerFire))
displayLink?.preferredFramesPerSecond = 1
displayLink?.add(to: RunLoop.current, forMode: .common)

CADisplayLinkTimer差不多,是根据屏幕刷新频率来的,也是需要添加在common模式下。

DispatchSourceTimer

gcdTimer = DispatchSource.makeTimerSource()
gcdTimer?.schedule(deadline: DispatchTime.now(),
                    repeating: DispatchTimeInterval.seconds(1))
gcdTimer?.setEventHandler(handler: {
    print("定时中。。。")
})
gcdTimer?.resume()

CGD中的定时器在使用时,不受主线程runloop的影响。它会在自己所在的线程中一直执行下去,直到你suspend或者cancel掉它。而且还可以指定handler所执行的线程。

RxSwift的Timer

let _ = Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler())
    .subscribe(onNext: { (state) in
        print(state)
    })
    .disposed(by: disposeBag)

首先,这个timer也是不用关心runloop方面的问题。

进去看看这个interval函数:

public static func interval(_ period: RxTimeInterval, 
                            scheduler: SchedulerType) -> Observable<Element> {
    return Timer(
        dueTime: period,
        period: period,
        scheduler: scheduler
    )
}

参数1:RxTimeInterval,就是DispatchTimeInterval的别名。public typealias RxTimeInterval = DispatchTimeInterval

参数2:调度者,MainScheduler()的视线中能看出,我们创建的是一个主线程调度者。

public init() {
    self._mainQueue = DispatchQueue.main
    super.init(serialQueue: self._mainQueue)
}

返回值是一个初始化的Timer对象,timer中保存了这些参数值,到此为止。

final private class Timer<Element: RxAbstractInteger>: Producer<Element> {
    fileprivate let _scheduler: SchedulerType
    fileprivate let _dueTime: RxTimeInterval
    fileprivate let _period: RxTimeInterval?

    init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
        self._scheduler = scheduler
        self._dueTime = dueTime
        self._period = period
    }
}

这个timerProducer的子类,也是一个序列。timer中的泛型要求遵守RxAbstractInteger协议的。这个协议也是个别名,public typealias RxAbstractInteger = FixedWidthIntegerFixedWidthInteger协议要求遵守它的实例都是用固定大小的整数类型。所以我们在创建序列的时候把泛型指定为Int

在创建了timer后,我们又进行了订阅,熟悉RxSwift核心逻辑的话,就会清楚订阅信号后,内部会创建一个匿名的观察者,然后返回创建的销毁者中会调用timer序列的订阅函数,这里又根据你指定的线程在不同的分支中走了timer序列run

func run<Observer: ObserverType>(_ observer: Observer, 
                                cancel: Cancelable) -> (sink: Disposable, 
                                                        subscription: Disposable) {
    if self._period != nil {
        let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    } else {
        ......
    }
}

这里还是RxSwift核心逻辑熟悉的路子,创建TimerSink并调用run

final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger  {
    typealias Parent = Timer<Observer.Element>

    private let _parent: Parent
    private let _lock = RecursiveLock()

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, 
                                                        startAfter: self._parent._dueTime,  
                                                        period: self._parent._period!) 
        { state in
            self._lock.lock(); defer { self._lock.unlock() }
            self.forwardOn(.next(state))
            return state &+ 1
        }
    }
}

run里面主要就是schedulePeriodi,起始时间默认为0,后面两个都是timer序列的间隔时间,最后还有个闭包。这个函数会在我们外面设置的那个主线程scheduler中调用,为了防止多线程调用导致数据错误,这里加了线程锁。state &+ 1也对应了FixedWidthInteger协议,防止溢出。继续钻进去:

MainScheduler的父类SerialDispatchQueueSchedulerschedulePeriodi的实现:

public func schedulePeriodic<StateType>(_ state: StateType, 
                                        startAfter: RxTimeInterval, 
                                        period: RxTimeInterval, 
                                        action: @escaping (StateType) -> StateType) -> Disposable {
    return self.configuration.schedulePeriodic(state, 
                                                startAfter: startAfter, 
                                                period: period, 
                                                action: action)
}

又是一个接口:

func schedulePeriodic<StateType>(_ state: StateType, 
                                startAfter: RxTimeInterval, 
                                period: RxTimeInterval, 
                                action: @escaping (StateType) -> StateType) -> Disposable {
    let initial = DispatchTime.now() + startAfter
    var timerState = state

    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: initial, repeating: period, leeway: self.leeway)
        
    var timerReference: DispatchSourceTimer? = timer
    let cancelTimer = Disposables.create {
        timerReference?.cancel()
        timerReference = nil
    }

    timer.setEventHandler(handler: {
        if cancelTimer.isDisposed {
            return
        }
        timerState = action(timerState)
    })
    timer.resume()
        
    return cancelTimer
}

看到这里一下就明白了,从刚开始的参数类型开始就一直和GCD有关联,原来就是用序列封装了DispatchSourceTimer,在定时器触发时发送序列的.next信号。在序列销毁时canceltimer

图解RxSwift中的Timer.jpg

总结

  1. 创建Timer序列时,保存需要的参数。
  2. 订阅信号时构建TimerSink管道。
  3. 管道run起来,用 GCD 中的 Timer 在eventHandler中发出信号。