分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业配置

1,665 阅读9分钟
原文链接: www.iocoder.cn
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 www.iocoder.cn/Elastic-Job… 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 作业配置

涉及到主要类的类图如下( 打开大图 ):

  • 黄色的类在 elastic-job-common-core 项目里,为 Elastic-Job-Lite、Elastic-Job-Cloud 公用作业配置类。

另外建议你已经( 非必须 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. 作业配置

一个作业( ElasticJob )的调度,需要配置独有的一个作业调度器( JobScheduler ),两者是 1 : 1 的关系。这点大家要注意下,当然下文看代码也会看到。

作业调度器的创建可以配置四个参数:

  1. 注册中心( CoordinatorRegistryCenter ):用于协调分布式服务。必填
  2. Lite作业配置( LiteJobConfiguration ):必填
  3. 作业事件总线( JobEventBus ):对作业事件异步监听。选填
  4. 作业监听器( ElasticJobListener ):对作业执行前,执行后进行同步监听。选填

2.1 注册中心配置

Elastic-Job 抽象了注册中心接口( RegistryCenter ),并提供了默认基于 Zookeeper 的注册中心实现( ZookeeperRegistryCenter )

ZookeeperRegistryCenter 对应配置类为 ZookeeperConfiguration。该类注释很完整,可以点击链接直接查看源码,这里我们重点说下 namespace 属性。如果你有多个不同 Elastic-Job集群 时,使用相同 Zookeeper,可以配置不同的 namespace 进行隔离。

注册中心的初始化,我们会在《Elastic-Job-Lite 源码解析 —— 注册中心》详细分享。

2.2 Lite作业配置

LiteJobConfiguration 继承自接口 JobRootConfiguration,作为 Elastic-Job-Lite 里的作业( LiteJob )配置。Elastic-Job-Cloud 的作业( CloudJob )对应另外的配置类,也实现了该接口。

public final class LiteJobConfiguration implements JobRootConfiguration {
    
    private final JobTypeConfiguration typeConfig;
    private final boolean monitorExecution;
    private final int maxTimeDiffSeconds;
    private final int monitorPort;
    private final String jobShardingStrategyClass;
    private final int reconcileIntervalMinutes;
    private final boolean disabled;
    private final boolean overwrite;
    
    // .... 省略部分get方法
    
    public static class Builder {
    
        // .... 省略部分属性
    
        public final LiteJobConfiguration build() {
            return new LiteJobConfiguration(jobConfig, monitorExecution, maxTimeDiffSeconds, monitorPort, jobShardingStrategyClass, reconcileIntervalMinutes, disabled, overwrite);
        }
    }
}
  • typeConfig:作业类型配置。必填
  • monitorExecution:监控作业运行时状态。默认为 false。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行》详细分享。

    每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。
    每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。

  • monitorPort:作业监控端口。默认为 -1,不开启作业监控端口。选填。在《Elastic-Job-Lite 源码解析 —— 作业监控服务》详细分享。

    建议配置作业监控端口, 方便开发者dump作业信息。
    使用方法: echo “dump” | nc 127.0.0.1 9888

  • maxTimeDiffSeconds:设置最大容忍的本机与注册中心的时间误差秒数。默认为 -1,不检查时间误差。选填。

  • jobShardingStrategyClass:作业分片策略实现类全路径。默认为使用分配侧路。选填。在《Elastic-Job-Lite 源码解析 —— 作业分片策略》详细分享。
  • reconcileIntervalMinutes:修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为 10。在《Elastic-Job-Lite 源码解析 —— 自诊断修复 》详细分享。

  • disabled:作业是否禁用执行。默认为 false。选填。

  • overwrite:设置使用本地作业配置覆盖注册中心的作业配置。默认为 false。选填。建议使用运维平台( console )配置作业配置,统一管理。
  • Builder 类:使用该类配置 LiteJobConfiguration 属性,调用 #build() 方法最终生成作业配置。参见:《JAVA设计模式 — 生成器模式(Builder)》

2.2.1 作业类型配置

作业类型配置接口( JobTypeConfiguration ) 有三种配置实现,针对三种作业类型:

配置实现 作业 说明
SimpleJobConfiguration SimpleJob 简单作业。例如:订单过期作业
DataflowJobConfiguration DataflowJob 数据流作业。TODO:笔者暂时未了解流式处理数据,不误人子弟
ScriptJobConfiguration ScriptJob 脚本作业。例如:调用 shell 脚本备份数据库作业

三种配置类属性对比如:

属性 SimpleJob DataflowJob ScriptJob 说明
coreConfig 作业核心配置
jobType JobType.SIMPLE JobType.DATAFLOW JobType.SCRIPT 作业类型
jobClass √ (默认:ScriptJob.class) 作业实现类全路径
streamingProcess 是否流式处理数据
scriptCommandLine 脚本型作业执行命令行

作业类型配置不仅仅适用于 Elastic-Job-Lite,也适用于 Elastic-Job-Cloud。

2.2.2 作业核心配置

作业核心配置( JobCoreConfiguration ),我们可以看到在每种作业类型配置都有该属性( coreConfig )。

public final class JobCoreConfiguration {
    
    private final String jobName;
    private final String cron;
    private final int shardingTotalCount;
    private final String shardingItemParameters;
    private final String jobParameter;
    private final boolean failover;
    private final boolean misfire;
    private final String description;
    private final JobProperties jobProperties;
    
    public static class Builder {
    
        // .... 省略部分属性
    
        public final JobCoreConfiguration build() {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(jobName), "jobName can not be empty.");
            Preconditions.checkArgument(!Strings.isNullOrEmpty(cron), "cron can not be empty.");
            Preconditions.checkArgument(shardingTotalCount > 0, "shardingTotalCount should larger than zero.");
            return new JobCoreConfiguration(jobName, cron, shardingTotalCount, shardingItemParameters, jobParameter, failover, misfire, description, jobProperties);
        }
    }
}
  • jobName:作业名称。必填。
  • cron:cron表达式,用于控制作业触发时间。必填。
  • shardingTotalCount:作业分片总数。如果一个作业启动超过作业分片总数的节点,只有 shardingTotalCount 会执行作业。必填。《Elastic-Job-Lite 源码解析 —— 作业分片策略 》详细分享。
  • shardingItemParameters:分片序列号和参数。选填。

    分片序列号和参数用等号分隔,多个键值对用逗号分隔
    分片序列号从0开始,不可大于或等于作业分片总数
    如:
    0=a,1=b,2=c

  • jobParameter:作业自定义参数。选填。

    作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
    例:每次获取的数据量、作业实例从数据库读取的主键等

  • failover:是否开启作业执行失效转移。开启表示如果作业在一次作业执行中途宕机,允许将该次未完成的作业在另一作业节点上补偿执行。默认为 false。选填。在《Elastic-Job-Lite 源码解析 —— 作业失效转移 》详细分享。

  • misfire:是否开启错过作业重新执行。默认为 true。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。
  • description:作业描述。选填。
  • jobProperties:作业属性配置。选填。在《Elastic-Job-Lite 源码解析 —— 作业执行 》详细分享。

    public final class JobProperties {
        
        private EnumMap<JobPropertiesEnum, String> map = new EnumMap<>(JobPropertiesEnum.class);
        
       public enum JobPropertiesEnum {
            
            /**
             * 作业异常处理器.
             */
            JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
            
            /**
             * 线程池服务处理器.
             */
            EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
            
            private final String key;
        
            private final Class<?> classType;
            
            private final String defaultValue;
       }
    }
    
    • JOB_EXCEPTION_HANDLER:用于扩展异常处理类。
    • EXECUTOR_SERVICE_HANDLER:用于扩展作业处理线程池类。
    • 通过这个属性,我们可以自定义每个作业的异常处理和线程池服务。

2.3 作业事件配置

通过作业事件配置( JobEventConfiguration ),实现对作业事件的异步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业事件追踪》详细分享。

2.4 作业监听器

通过配置作业监听器( ElasticJobListener ),实现对作业执行的同步监听、处理。在《Elastic-Job-Lite 源码解析 —— 作业监听器》详细分享。

3. 作业配置服务

多个 Elastic-Job-Lite 使用相同注册中心和相同 namespace 组成集群,实现高可用。集群中,使用作业配置服务( ConfigurationService ) 共享作业配置。

public final class ConfigurationService {

    /**
     * 时间服务
     */
    private final TimeService timeService;
    /**
     * 作业节点数据访问类
     */
    private final JobNodeStorage jobNodeStorage;
    
    public ConfigurationService(final CoordinatorRegistryCenter regCenter, final String jobName) {
        jobNodeStorage = new JobNodeStorage(regCenter, jobName);
        timeService = new TimeService();
    }
}
  • JobNodeStorage,封装注册中心,提供存储服务。在《Elastic-Job-Lite 源码解析 —— 作业数据存储》详细分享。
  • TimeService,时间服务,提供当前时间查询。

    public final class TimeService {
        
        /**
         * 获取当前时间的毫秒数.
         * 
         * @return 当前时间的毫秒数
         */
        public long getCurrentMillis() {
            return System.currentTimeMillis();
        }
    }
    

3.1 读取作业配置

/**
* 读取作业配置.
* 
* @param fromCache 是否从缓存中读取
* @return 作业配置
*/
public LiteJobConfiguration load(final boolean fromCache) {
   String result;
   if (fromCache) { // 缓存
       result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
       if (null == result) {
           result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
       }
   } else {
       result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
   }
   return LiteJobConfigurationGsonFactory.fromJson(result);
}

3.2 持久化作业配置

/**
* 持久化分布式作业配置信息.
* 
* @param liteJobConfig 作业配置
*/
public void persist(final LiteJobConfiguration liteJobConfig) {
   checkConflictJob(liteJobConfig);
   if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
       jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
   }
}
  • 调用 #checkConflictJob(...) 方法校验注册中心存储的作业配置的作业实现类全路径( jobClass )和当前的是否相同,如果不同,则认为是冲突,不允许存储:

    private void checkConflictJob(final LiteJobConfiguration liteJobConfig) {
       Optional<LiteJobConfiguration> liteJobConfigFromZk = find();
       if (liteJobConfigFromZk.isPresent()
               && !liteJobConfigFromZk.get().getTypeConfig().getJobClass().equals(liteJobConfig.getTypeConfig().getJobClass())) { // jobClass 是否相同
           throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'", 
                   liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
       }
    }
    
  • 当注册中心未存储该作业配置 或者 当前作业配置允许替换注册中心作业配置( overwrite = true )时,持久化作业配置。

3.3 校验本机时间是否合法

/**
* 检查本机与注册中心的时间误差秒数是否在允许范围.
* 
* @throws JobExecutionEnvironmentException 本机与注册中心的时间误差秒数不在允许范围所抛出的异常
*/
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
   int maxTimeDiffSeconds =  load(true).getMaxTimeDiffSeconds();
   if (-1  == maxTimeDiffSeconds) {
       return;
   }
   long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
   if (timeDiff > maxTimeDiffSeconds * 1000L) {
       throw new JobExecutionEnvironmentException(
               "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
   }
}
  • Elastic-Job-Lite 作业触发是依赖本机时间,相同集群使用注册中心时间为基准,校验本机与注册中心的时间误差是否在允许范围内( LiteJobConfiguration.maxTimeDiffSeconds )。

666. 彩蛋

知识星球

Elastic-Job-Lite 源码解析系列第一篇文章,希望大家多多支持,预计全部更新完会有 15+ 篇。Elastic-Job-Cloud 源码系列后续也会更新。

道友,分享一波微信朋友圈支持支持支持,可好?