Python定时任务框架:APScheduler源码剖析(二)

2,057 阅读9分钟

Python定时任务框架:APScheduler源码剖析(二)

前言

在上一篇文章中,简单的捋了一遍使用BackgroundScheduler调度器做定时任务的流程,本篇接着上一篇文章,分析一下_real_add_job方法和_main_loop方法。

虽然APScheduler有多种不同的调度器以及多种不同的使用方式,但其核心都是类似的,一通百通。

对了,文中分析的APScheduler是当前最新版3.6.1。

剖析_real_add_job

回顾一下上篇文章添加任务对象的大致逻辑。

实例化BackgroundScheduler --> 调用add_job方法 --> 最终调用_real_add_job

_real_add_job源码如下,其代码的大致逻辑以给出相应的注释。

# apscheduler/schedulers/base.py/BaseScheduler

    def _real_add_job(self, job, jobstore_alias, replace_existing):
        """
        将任务对象添加到指定的存储后端中(默认就是内存中-->dict)
        """
        
        # 使用默认值填写未定义的值
        replacements = {}
        for key, value in self._job_defaults.items():
            if not hasattr(job, key):
                replacements[key] = value

        # 如果未定义下次运行时间,则计算下次运行时间
        if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)

        # 应用任何替换
        job._modify(**replacements)

        # 将作业添加到给定的作业库
        store = self._lookup_jobstore(jobstore_alias)
        try:
            store.add_job(job)
        except ConflictingIdError:
            if replace_existing:
                store.update_job(job)
            else:
                raise

        # 将任务对象标记为非待定
        job._jobstore_alias = jobstore_alias

        # 通知监听器已添加新任务
        event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
        self._dispatch_event(event)

        self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)

        # 通知调度程序有关新工作的信息
        if self.state == STATE_RUNNING:
            self.wakeup()

代码中已有大致的注释,这里再简单分析一下。

一开始定义replacements字典,然后循环处理_job_defaults字典,判断该字典中的key是否是job任务对象的属性,如果是,则添加到replacements字典中。

接着同样的方式,判断job任务对象是否存在next_run_time属性,如果不存在,则需要调用当前任务对象中触发器(trigger)的get_next_fire_time方法计算出当前任务对象下一次要运行的时间。

随后调用job任务对象_modify方法,该方法的作用就是修改job任务对象的属性。

怎么实现属性的修改?没想象的那样使用了什么高深的技巧,看该方法的源码,就定义了approved字典,然后将replacements字典中的值获取并判断是否替换,替换就放到approved字典中,最后遍历approved字典,利用setattr方法将该字典中的值设置为job对象的属性,实现job对象属性的修改。

接着调用_lookup_jobstore方法找到用于存储当前任务对象的job stores(任务存储器),jobstore_alias是_real_add_job方法的参数,该方法是add_job方法调用的,往回看,可知jobstore_alias默认为default字符串,则选择内存作为job stores,然后调用job sotres的add_job方法将任务对象加入其中。

为啥说job stores为default就是使用内存作为job store呢?

看到BaseScheduler类的start方法,该方法部分逻辑如下。

# apscheduler/scheduleers/base.py/BaseScheduler

 with self._jobstores_lock:
            # Create a default job store if nothing else is configured
            if 'default' not in self._jobstores:
                self.add_jobstore(self._create_default_jobstore(), 'default')

其调用了_create_default_jobstore方法创建默认jobstore,该方法代码如下。

# apscheduler/scheduleers/base.py/BaseScheduler

def _create_default_jobstore(self):
        """Creates a default job store, specific to the particular scheduler type."""
        return MemoryJobStore() # 创建内存作为jobstore

看MemoryJobStore类的__init__方法,就可以知道它使用dict来作为最终的存储对象。

# apscheduler/jobstores/memory.py

class MemoryJobStore(BaseJobStore):
    """
    Stores jobs in an array in RAM. Provides no persistence support.

    Plugin alias: ``memory``
    """

    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

回到_real_add_job方法,通过_lookup_jobstore方法寻找job store并通过add_job方法将其添加到相应的job store后,修改job任务对象_jobstore_alias,将任务对象标记为非待定状态。

随后实例化JobEvent类,然后调用_dispatch_event方法实现事件消息通知,这里的细节暂时不看,简单而言就是当调度器因某个任务被触发了,而自身自定义了一些相关的监听器,这些监听器此时就会被调用。

做完这些操作后,就会唤醒调度器线程,通过wakeup方法,该方法在BaseScheduler中是被abstractmethod装饰器装饰的空方法,因为abstractmethod,所以其子类必须重写wakeup方法。该方法的代码在BlockingScheduler中,作用就是调用set()方法唤醒线程。

# apscheduler/schedulers/blocking.py/BlockingScheduler

def wakeup(self):
   # set()方法会将事件标志状态设置为true。
   self._event.set() # 唤醒,避免过长的休眠

剖析get_next_fire_time

回顾一下_real_add_job,其中一段代码如下。

if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)

get_next_fire_time方法挺关键的,上述代码中传入了None和当前地区的时间戳,该方法计算方式如下。

# apscheduler/triggers/interval.py/IntervalTrigger

    def get_next_fire_time(self, previous_fire_time, now):
        if previous_fire_time:
            # 上次运行时间加上间隔时间获得下一次要运行的时间
            next_fire_time = previous_fire_time + self.interval
        elif self.start_date > now:
            next_fire_time = self.start_date
        else:
            timediff_seconds = timedelta_seconds(now - self.start_date)
            next_interval_num = int(ceil(timediff_seconds / self.interval_length))
            next_fire_time = self.start_date + self.interval * next_interval_num

        if self.jitter is not None:
            next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)

        if not self.end_date or next_fire_time <= self.end_date:
            return self.timezone.normalize(next_fire_time)

get_next_fire_time方法主要就是计算任务对象下次要运行的时间。

不同的触发器,其get_next_fire_time方法不同。

剖析_main_loop

在剖析_main_loop前, 先回顾一下BackgroundScheduler类的继承。

BackgroundScheduler --> BlockingScheduler --> BaseScheduler

BaseScheduler是元类。

BlockingScheduler适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。

BackgroundScheduler基于BlockingScheduler,相比于BlockingScheduler,其实就创建一下线程来实现后台处理的效果,其余整体与BlockingScheduler没有什么区别。

在BackgroundScheduler的start方法中,创建另一个线程来启动_main_loop方法,该方法的逻辑在BlockingScheduler类中,BlockingScheduler类完整代码如下。

class BlockingScheduler(BaseScheduler):
    """
    A scheduler that runs in the foreground
    (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).
    """
    _event = None

    def start(self, *args, **kwargs):
        self._event = Event()
        super().start(*args, **kwargs)
        self._main_loop()

    def shutdown(self, wait=True):
        super().shutdown(wait)
        self._event.set()

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            # 等待事件通知,wait_seconds为等待事件通知的超时时间
            # wait()方法会阻塞线程,直到事件标志状态为true。
            self._event.wait(wait_seconds)
            # clear()方法将事件标志状态设置为false
            self._event.clear()
            wait_seconds = self._process_jobs()

    def wakeup(self):
        # set()方法会将事件标志状态设置为true。唤醒线程
        self._event.set()

先只需关注其_main_loop方法,该方法一开始定义了wait_seconds等待时间,定位threaing.TIMEOUT_MAX,然后判断调度器当前的状态,如果不是停止状态,则通过线程事件的wait方法阻塞线程,要解开阻塞,要么_real_add_job方法调用了wakeup方法将线程唤醒,如果调度器线程是被唤醒的,说明有任务对象需要它处理,另种解开阻塞的方式就是等待其超时。

当线程被唤醒后,立刻又会调用clear方法,将线程事件状态设置为false,当再次循环到wait时会再次被阻塞。

_main_loop方法中最关键的其实是_process_jobs方法,该方法会遍历所有的job store,调用执行器执行到时间的job并计算出每个job store中的job下次的执行时间,将其中最短的时间返回,调度器会阻塞等待相应的时间。

_process_jobs方法源码如下。

# apscheduler/schedulers/base.py/BaseScheduler

       def _process_jobs(self):
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone) # 当前时间
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            # 从_jobstores中获取当前要处理的任务
            for jobstore_alias, jobstore in self._jobstores.items():
                try:
                    # 以当前时间为基准,判断是否到了执行时间
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    # 唤醒时间
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time

                    continue

                for job in due_jobs:
                    # 搜索当前任务对象的执行器
                    try:
                        executor = self._lookup_executor(job.executor)
                    except BaseException:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue
                    # 获得运行时间
                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            # 提交这个任务给执行器
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                        except BaseException:
                            self._logger.exception('Error submitting job "%s" to executor "%s"',
                                                   job, job.executor)
                        else:
                            event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                       run_times)
                            events.append(event)

                        # 计算任务对象下一次执行时间
                        job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
                        if job_next_run:
                            # 修改job数学
                            job._modify(next_run_time=job_next_run)
                            # 将修改后的job更新到jobstore中
                            jobstore.update_job(job)
                        else:
                            # 没有下次运行时间了,则说明这个任务对象结束了,将其移除jobstore
                            self.remove_job(job.id, jobstore_alias)
                            
                # 设置新的下次唤醒时间
                jobstore_next_run_time = jobstore.get_next_run_time()
                if jobstore_next_run_time and (next_wakeup_time is None or
                                               jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

        # Dispatch collected events
        for event in events:
            self._dispatch_event(event)

        # Determine the delay until this method should be called again
        if self.state == STATE_PAUSED:
            wait_seconds = None
            self._logger.debug('Scheduler is paused; waiting until resume() is called')
        elif next_wakeup_time is None:
            wait_seconds = None
            self._logger.debug('No jobs; waiting until a job is added')
        else:
            wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)

        return wait_seconds

_process_jobs方法似乎比较长,但逻辑其实很直观。

先计算出当前时间now,然后获取_jobstores_lock可重入锁,然后遍历所有的jobstores,调用get_due_jobs方法获取当前时间要执行的任务,memory内存任务存储器其get_due_jobs方法代码如下。

# apscheduler/jobstores/memory.py/MemoryJobStore

    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

    def get_due_jobs(self, now):
        now_timestamp = datetime_to_utc_timestamp(now)
        pending = []
        for job, timestamp in self._jobs:
            if timestamp is None or timestamp > now_timestamp:
                break
            pending.append(job)

        return pending

MemoryJobStore.get_due_jobs方法的逻辑就是遍历当前所有任务对象的下次执行时间,如果执行时间小于或等于当前时间,则加入pending列表中返回。

获得到当然要执行的任务对象列表后,遍历这个列表,然后调用_lookup_executor方法从中取出当前任务对象的执行器,如果没有执行器,则通过remove_job方法将该任务从相应的job store移除。

随后调用_get_run_times方法获取运行时间,该方法代码如下。

# apscheduler/job.py/Job

    def _get_run_times(self, now):
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times

_get_run_times方法运行逻辑就是获取self.next_run_time,然后判断是否小于等于当前时间,如果是,则加入run_times列表,并调用get_next_fire_time方法计算出下一次要运行的时间。

回到_process_jobs方法,获得run_times后,调用执行器的submit_job方法将当前任务对象、运行时间作为参数提交给执行器,执行器会去执行任务对象具体的逻辑。

随后调用get_next_fire_time方法计算下次任务的执行时间,计算出的结果通过_modify方法修改到job任务对象上,此外还通过update_job方法将任务更新到相应的job store中。

最后调用job store的get_next_run_time方法,计算job store下次要执行的时间,这里依旧看一下内存job store中该方法的逻辑。

#apscheduler/jobstores/memory.py/MemoryJobStore

   def get_next_run_time(self):
        return self._jobs[0][0].next_run_time if self._jobs else None

结尾

本文展示了比较多的代码细节,当然还有很多细节没有谈及,你看到这里说明挺感兴趣的,这里简单总结一下前面的内容,理一理完整的逻辑。

添加job到相应job store的逻辑链条为。

BaseScheduler.add_job方法添加任务对象 --> BaseScheduler._real_add_job方法才是具体的添加任务对象堕落街 --> BaseScheduler._lookup_jobstore方法查到当前任务对象对应的job store --> store.add_job(job)方法将任务对象添加到job store中 --> BlockingScheduler.wakeup方法唤醒调度器线程。

_main_loop方法主循环的逻辑链条为。

BlockingScheduler._main_loop方法为主循环最上层方法 --> _process_jobs方法进行具体的调度执行器执行任务以及计算任务对象等下次执行事情的逻辑。

APScheduler源码还没有读完,下篇来看看执行器是怎么执行任务对象的。

如果文章对你有所帮助,点击「在看」支持二两,下篇文章见。