XXl-job 部分源码解析

3,982 阅读4分钟

前言

文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
鼓励大家在技术的路上写博客

絮叨

因为之前用xxl-job 用的也很多了,然后自己也把他的边缘业务代码改换成自己公司合适的用法了,比如说我们的动态定时任务,调用接口啥的,但是对应他的核心源码,一直没有机会梳理一下,固找个时间,一起梳理梳理,但是我想说的是本文争对的是原理的解读,不适合XXL-Job的初学者,我这边默认你已经对于XXl-job至少是有一定的了解,如果要去入门,先去下面的官方文档

架构图

上图是我们要进行源码分析的2.1版本的整体架构图。其分为两大块,调度中心和执行器,我们先分析调度中心,也就是xxl-job-admin这个包的代码。

主流程

XXL-JOB 他是基于SpringBoot的,所以我们的第一步当然是启动Spring的应用程序 XxlJobAdminApplication,然后他里面有个配置类XxlJobAdminConfig 实现了 InitializingBean ,Spring在初始化之后会加载这个Bean,如下图

然后我们来看看 xxlJobScheduler.init();方法,如下图

  • 第一步国际化相关。
  • 第二步监控相关。
  • 第三步失败重试相关。
  • 第四步启动admin端服务,接收注册请求等。
  • 第五步JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的

其实上面的步骤还是很多地方需要我们去一个个了解的,但是我们今天就挑些来学习,我们直接看最后一步JobScheduleHelper

JobScheduleHelper

非常重要的一个类,我们来看看他的start()方法

其实很简单,就是启动了2后台线程,这2个线程是死循环的,但是这2个线程确也是很重要的。我们一个个来看

我直接把代码拷贝了过来,然后我们来慢慢说吧

  • 死循环内的代码如上图,首先利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务。

其实这种代码我们撸业务的。谁不是闭着眼就来,哈哈 返回的是一个JobInfo的List,如果是我们写代码会怎么样?第一个当然是判断是否为空,然后遍历去处理每一个对象,还别说xxl-job也是这样的,然后根据对象里面的属性不同做不同的业务逻辑处理,哈哈,是不是觉得开源框架和我们平时写的代码也差不多。

然后根据triggerNextTime这个字段来走不同的分支,一共有3个分支,我们来看看这几个分支的逻辑

  • 第一个分支当前任务的触发时间已经超时5秒以上了,不在执行,直接计算下一次触发时间。
  • 第二个分支为触发时间已满足,利用JobTriggerPoolHelper这个类进行任务调度,之后判断下一次执行时间如果在5秒内,进行此任务数据的缓存,处理逻辑与第三个分支一样。
  • 我们来看看那个pushTimeRing(ringSecond, jobInfo.getId());
    private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }

ringData是以0到59的整数为key,以jobId集合为value的Map集合。这个集合数据的处理逻辑,就在我们第二个守护线程ringThread中。

  • 我们来看看我们的第二个守护线程吧
  // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // align second
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                while (!ringThreadToStop) {

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }

                    // next second, align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();

根据当前秒数刻度和前一个刻度进行时间轮的任务获取,之后和上文一样,利用JobTriggerPoolHelper进行任务调度。

时序图

日常求赞

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才

创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见

六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !

本文使用 mdnice 排版