Activiti6.0的探索纪要

5,705 阅读15分钟

前言

年初要做一个运维自动化平台,需要用到流程引擎,本来打算项目用golang写的,但是golang的流程引擎功能太简单实在是用不来,最后还是选型java + activiti。到activiti官网一看,嘿出7.0了结果文档是刚写的还不全,我们java还是8的,7.0是匹配的java11,最终是问题太多只好放弃用activiti6.0了。

摸石头过河

虽然网上教程有不少,不过要真正跑起来着实不容易,有些内容比如参数的意义还是看5.0的手册才弄明白的。

安装

解决依赖先跑起来

搭建环境:java8 + springboot2.1.3 + activiti6.0 + mysql

pom依赖:

<dependency>
    <groupId>org.activiti</groupId>
    <artifactId>activiti-spring-boot-starter-basic</artifactId>
	<version>${activiti.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

配置数据库:

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/activiti?nullCatalogMeansCurrent=true&serverTimezone=Asia/Shanghai
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root

然后启动,恭喜你,应用没起来,会出现如下报错:

Caused by: java.io.FileNotFoundException: class path resource [org/springframework/security/config/annotation/authentication/configurers/GlobalAuthenticationConfigurerAdapter.class] cannot be opened because it does not exist:

这个原因是activiti6.0开发的时候springboot2.0还没出来以致年久失修,文件路径不对了,解决方案:

启动类前排除掉SecurityAutoConfiguration

@SpringBootApplication(exclude = SecurityAutoConfiguration.class)

再次启动,如果你碰到创建数据库表却找不到表的问题,两种解决方法:

  • 手动创建表,表的路径在activiti-engine-6.0.0.jar/org/activiti/db/create
  • 数据库配置中加上nullCatalogMeansCurrent=true

这是因为在org/activiti/engine/impl/db/DbSqlSession中activiti使用databaseMetaData.getTables寻找库表是否存在,而dbSqlSessionFactory获取到的catalog为空因为mysql使用schema标识库名而不是catalog,导致mysql扫描所有的库来找表,一旦其他库中有同名表activiti就以为找到了其实表并不存在。nullCatalogMeansCurrent的意义就在于让mysql默认当前库,在mysql-connector-java 5.x该参数默认为true,但在6.x以上默认为false。

再次启动,仍然没起来:

Caused by: java.io.FileNotFoundException: class path resource [processes/] cannot be resolved to URL because it does not exist

这是因为activiti会到resources/processes下面寻找流程文件,创建该目录;application.properties中加上spring.activiti.check-process-definitions=false

配置画图插件

activiti提供了多种设计器来画流程图,我安装了IDEA的插件和eclipse的插件,比较了下还是eclipse的好用,所以只介绍eclipse 插件的安装过程。

  1. 下个eclipse
  2. 下载插件 Help --> Install New Software -->Add:
Names : Activiti BPMN 2.0 designer
Location : https://www.activiti.org/designer/update/
  1. 下完了New一个diagram

4. 画流程图,一个标准的流程图有一个开始事件和一个结束事件,画完了保存为.bpmn文件或者.bpmn20.xml文件,并放到resource/processes目录下,一个xml示例文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:tns="http://www.activiti.org/test" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test" id="m1551347612734" name="">
  <process id="machine_expansion" name="Machine Expansion" isExecutable="true" isClosed="false" processType="None">
    <startEvent id="start" name="开始"></startEvent>
    ...

运行

使用runtimeservice创建一个createProcessInstanceBuilder,设置processDefinitionKey为xml中的process id,然后start就可以了。

    @Autowired
    private RuntimeService runtimeService;
    @Override
    public String start(BaseParamDTO baseParamDTO) {
        String currentUserName = CurrentUserUtil.getCurrentUserName();
        identityService.setAuthenticatedUserId(currentUserName);
        ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
        ProcessInstance machineExpansion = runtimeService.createProcessInstanceBuilder().processDefinitionKey(WorksheetTypeEnum.EXPANSION.getActivitiDefineName())
                .businessKey(expansionParamDTO.getAppName()).name(WorksheetTypeEnum.EXPANSION.name()).start();
        baseParamDTO.setId(machineExpansion.getProcessInstanceId());
        submit(baseParamDTO);
        return machineExpansion.getProcessInstanceId();
    }

为了设置流程的startuser,必须要使用identityService.setAuthenticatedUserId设置,因为process获取的是当前线程的用户。

数据库表

要熟悉一个项目,最简单的方式就是看他的数据库表是怎么设计的,表里面都存了什么东西,activiti数据库中一共有28张表

  • ACT_GE_*:通用表,一般不用关注,如果其他表中出现二进制的数据id,比如抛错,就到act_ge_bytearray中查询
  • ACT_HI_*:历史信息,activiti历史信息存储时分4个级别,可以用spring.activiti.historyLevel配置,对应HistoryService
    • none:什么也不保存
    • activity:保存所有的流程实例、任务、活动信息;
    • audit:默认级别,保存所有的流程实例、任务、活动、表单属性;
    • full:流程过程中全部保存,比audit多详情和变量;
  • ACT_ID_*:用户认证信息,对应IdentityService;
  • ACT_RE_*:流程中的部署和定义,对应RepositoryService;
  • ACT_RU_*:运行时信息,流程级别相关的表对应RuntimeService,task级别表对应TaskService,job级别表对应ManagementService。
表名 描述
act_evt_log 事件日志表
act_ge_bytearray 二进制数据表,比如抛错
act_ge_property activiti属性表
act_hi_actinst 历史的流程活动实例
act_hi_attachment 历史的流程附件
act_hi_comment 历史的描述信息
act_hi_detail 历史的流程运行中的细节信息
act_hi_identitylink 历史的用户关系信息
act_hi_procinst 历史的流程实例
act_hi_taskinst 历史的usertask实例
act_hi_varinst 历史的流程参数
act_id_group 用户组
act_id_info 用户详情信息
act_id_membership 用户和用户组关联关系
act_id_user 用户信息
act_procdef_info 流程定义扩展表
act_re_deployment 流程部署信息
act_re_model 流程模型
act_re_procdef 已部署过的流程
act_ru_deadletter_job 运行时中死掉的job
act_ru_event_subscr 运行时的事件监听
act_ru_execution 运行时的流程执行实例
act_ru_identitylink 运行时的用户关系信息
act_ru_job 运行时的job
act_ru_suspended_job 运行时暂停的job
act_ru_task 运行时的usertask实例
act_ru_timer_job 运行时的定时器job
act_ru_variable 运行时的参数

描述东西要举例

基础定义

在activiti中有几个基础定义:

  • process:一个流程实例,比如定义了一个审批流程bpmn,现在run起来了,这就是一个流程实例。
  • exection:流程执行实例,一个流程中有很多步骤,每一个步骤都会触发一个流程执行实例。
  • task:专门指usertask,当创建了usertask时,activiti会在exection中创建一个task。
  • job:执行步骤时,每个exection会生成job给jobExecutor去执行,如果是一般job就在act_ru_job中,定时任务就在act_ru_timer_job中,job暂停了就会进入act_ru_suspended_job,跑失败了就到act_ru_deadletter_job。
  • activity:这个词在历史表中出现,理解上和exection差不多,区别在于exection是一个执行实例,一个执行实例可以执行多个activity,而activity是指流程中的一个步骤,会对应到bpmn中某一步。

几种常用的task的区别

eclipse设计器总共有8种task,目前我只用到其中的4种

用户任务(User Task)

用户任务用来设置需要人操作才能完成的任务如审批,执行到usertask时,流程会卡住,只有用户人工触发了流程才会继续。示例如下,这里没有在画bpmn时写死用户,而是在流程中通过taskService设置assignee,usertask完成只能是complete,后续的流程可以用参数和排他网关控制。

    @Override
    public void submit(BaseParamDTO baseParamDTO) {
        String currentUserName = CurrentUserUtil.getCurrentUserName();
        ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
        Task submit = taskService.createTaskQuery().processInstanceId(baseParamDTO.getId()).taskDefinitionKey(ExpansionDisplayEnum.SUBMIT.getFlow()).singleResult();
        taskService.setAssignee(submit.getId(), currentUserName);
        taskService.setVariable(submit.getId(), INPUT_PARAM, expansionParamDTO);
        taskService.setVariable(submit.getId(), INPUT_PRIORITY, baseParamDTO.getPriority());
        taskService.setPriority(submit.getId(), baseParamDTO.getPriority());
        if (StringUtils.isNotBlank(baseParamDTO.getComment())){
            taskService.addComment(submit.getId(), null, baseParamDTO.getComment());
        }
        taskService.complete(submit.getId());
    }

用户任务涉及到的几个概念:

  • assignee:该任务的执行人,谁执行的就写谁
  • candidateUsers:该任务的候选人,需要哪些人执行的就写谁
  • candidateGroups:该任务的候选组,需要哪个组的人执行的就写哪些组
  • comment:备注,这个信息会写到act_hi_comment表中,只能和usertask或者流程相关联

脚本任务(Script Task)

写一段脚本,执行到该步骤时自动执行脚本。(没用过)

服务任务(Service Task)

最常用的task,执行到该步骤时自动执行对应的java类

@Component
public class ApplyMachineTask implements JavaDelegate {

    public static final Logger LOGGER = LoggerFactory.getLogger(ApplyMachineTask.class);

    public static final String MACHINE_APPLY_ID = "applyId";
    public static final String MACHINE_APPLY_ID_LOCAL = "localApplyId";
    public static final String MACHINE_APPLY_CHECK = "applyCheckCount";

    @Autowired
    private MachineService machineService;

    @Override
    public void execute(DelegateExecution delegateExecution){
        ExpansionParamDTO param = delegateExecution.getVariable(INPUT_PARAM, ExpansionParamDTO.class);
        LOGGER.debug("start apply machine, param {}", param);
        param.setWorksheetId(delegateExecution.getProcessInstanceId());
        ApiResponseDTO<String> stringApiResponseDTO = machineService.applyMachine(param);
        if (stringApiResponseDTO.isSuccess()){
            delegateExecution.setVariable(MACHINE_APPLY_ID, stringApiResponseDTO.getBody());
            delegateExecution.setVariableLocal(MACHINE_APPLY_ID_LOCAL, stringApiResponseDTO.getBody());
            delegateExecution.setVariable(MACHINE_APPLY_CHECK, 0);
        } else {
            delegateExecution.setVariableLocal(ERROR_LOCAL, stringApiResponseDTO.getError());
            throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
        }
    }
}

这里说下参数设置,setVariable会设置流程全局变量,也就是流程中任意一个步骤都能获取到的参数,用来参数传递,而setVariableLocal设置的是局部变量,只有当前步骤才能获取的参数,局部变量的用处在于可以记录流程执行到该步骤时的状态,参数涉及到覆盖问题,所以不要出现全局和局部参数同名的现象(子流程的除外)。

业务规则任务(BusinessRule Task)

业务规则用户用来同步执行一个或多个规则。activiti使用drools规则引擎执行业务规则。(没用过)

邮件任务(Mail Task)

自动邮件任务,它可以发送邮件给一个或多个参与者。(没用过)

手工任务(Manual Task)

手工任务定义了BPM引擎外部的任务。 用来表示工作需要某人完成,而引擎不需要知道,也没有对应的系统和UI接口。 对于引擎,手工任务是直接通过的活动, 流程到达它之后会自动向下执行。(没用过)

接收任务(Receive Task)

接收任务是一个简单任务,它会等待对应消息的到达。流程到达该步骤后会一直等待,直到消息触发。如下图所示,在一个接收任务上挂了一个定时边界事件,定时边界事件每隔一段时间触发服务任务检查状态,当状态通过时给接收任务发消息,接收任务完成,定时边界网关停止。

定时边界网关配置
消息触发:

 Execution receiveTask = runtimeService.createExecutionQuery().processInstanceId(execution.getProcessInstanceId())
    .activityId(ExpansionDisplayEnum.WAIT_INIT.getFlow()).singleResult();
runtimeService.trigger(receiveTask.getId());

调用子流程(CallActivity)

activity有两种子流程,一种是CallActivity,一种是subprocess,区别在于CallActivity是调用主流程外部的一个流程,运行时会生成新的processId,作用域完全独立,通常用来复用其他流程或者需要独立processId的情况;而subprocess是嵌套子流程,他并不会生成新的processId,但是定义了独立的事件域,通常用来捕捉同一类事件。

  1. 画一个子流程,这里不能选async,选了就跑不起来了,我的理解是调用CallActivity和执行新的子流程是不同的线程执行的,调用CallActivity只是为了生成创建子流程的job,而子流程是否异步执行是在multi instance页面配置的。
  2. main config页面配置要调用的子流程的process id和需要传的参数,这里参数的意思是主流程的参数名privilege传过去后叫参数名param
  3. multi instance多实例页配置,sequential就是配置是同步还是异步的地方,loop 可以直接指定实例个数,这里通过传参解决,定义collection表示会有一个列表名字叫privilegeList的全局参数,列表中每个参数实例的名字叫privilege,completion condition设置完成条件。当创建了多实例后activiti会自动创建多个参数:
  • nrOfInstances:实例总数
  • nrOfActiveInstances:当前活动的实例
  • nrOfCompletedInstances:已经完成的实例

这里设置的条件是全部完成的时候主流程继续。

妈妈,我想用try catch

activiti有两种事件,一种是流程事件,放在流程中的,一种是边界事件,挂在某个步骤上由于流程运行间接触发的,而这两种事件又分为两类,一种是捕获一种是触发。

  • 捕获(Catching):当流程执行到事件, 它会等待被触发。
  • 触发(Throwing):当流程执行到事件, 会触发一个事件。

这里要说的就是边界错误捕获事件。

如图所示,把启动应用的所有步骤都放到了一个内嵌子流程subprocess中,然后在subprocess上挂载了一个错误边界捕获事件,当子流程中任意一个步骤抛错时捕获错误触发重试按钮,如果重启则重新执行整个子流程,如果跳过这流程继续。

错误边界事件中有一个errorCode参数,errorCode用来匹配捕获的错误:

  • 如果没有设置errorRef,边界错误事件会捕获 所有错误事件,无论错误的errorCode是什么。
  • 如果设置了errorRef,并引用了一个已存的错误, 边界事件就只捕获错误代码与之相同的错误。
  • 如果设置了errorRef,但是BPMN 2.0中没有定义错误, errorRef就会当做errorCode使用 (和错误结束事件的用法类似)。

配置一个排他网关,在每条路径上写明条件

子流程每个步骤中try catch住错误,并抛出

delegateExecution.setVariableLocal(ERROR_LOCAL,stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());

需要注意的是抛错中的错误文本好像获取不到,所以还是要用局部变量来存一下。

如何做审批和驳回

因为我想记录所有的操作步骤,所以我把每一种操作都进行了分割,因此出现了下图所示的审批流程。

第一次提交任务是在流程开始时直接执行的,然后使用了监听器在审批步骤开始前设置审批候选人,如果当前用户是审批候选人直接跳过审批流程,审批是否通过是通过排他网关加上参数控制的。

审批usertask上设置一个监听器,用来设置任务的审批候选人

@Component
public class ApprovalTaskListener implements TaskListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalTaskListener.class);

    @Autowired
    private RuntimeService runtimeService;

    @Autowired
    private ApprovalTask approvalTask;

    @Autowired
    private ApplicationService applicationService;

    @Autowired
    private TaskService taskService;

    @Override
    public void notify(DelegateTask delegateTask) {
        String processInstanceId = delegateTask.getProcessInstanceId();
        ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult();
        String appName = processInstance.getBusinessKey();
        String owner = processInstance.getStartUserId();
        Integer priority = delegateTask.getVariable(INPUT_PRIORITY, Integer.class);
        taskService.setPriority(delegateTask.getId(), priority);
        ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
        if (application.isSuccess()){
            List<String> bizMaintainer = application.getBody().getBizMaintainer();
            delegateTask.addCandidateUsers(bizMaintainer);
            //申请人是审批人时直接处理
            if (bizMaintainer.contains(owner)){
                taskService.setAssignee(delegateTask.getId(), owner);
            }
        } else {
            LOGGER.error("get application info error, pId: {}, app: {}", processInstanceId, application);
            throw new RuntimeException(application.getError());
        }
    }
}

在审批usertask外层设置exection的监听器approvalSkipListener,前面说过exection是先于usertask执行的,每个usertask外层都有一个exection,所以这个监听器可以控制usertask的行为。

@Component
public class ApprovalSkipListener implements ExecutionListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalSkipListener.class);

    @Autowired
    private RuntimeService runtimeService;

    @Autowired
    private ApplicationService applicationService;

    @Override
    public void notify(DelegateExecution execution) {
        ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
        String appName = processInstance.getBusinessKey();
        String owner = processInstance.getStartUserId();
        ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
        if (application.isSuccess()){
            List<String> bizMaintainer = application.getBody().getBizMaintainer();
            if (bizMaintainer.contains(owner)){
                execution.setVariable(SKIP_ENABLE,true);
                execution.setVariable(APPROVAL_PASS, true);
            }
        } else {
            LOGGER.error("get application info error, pId: {}, app: {}", processInstance.getId(), application);
            throw new RuntimeException(application.getError());
        }
    }
}

设置跳过条件

如果驳回了,流程又会回到提交任务那,需要用户重新提交,数据库中会出现新的记录。

说回滚就回滚

activiti有个很有意思的功能就是补偿,回滚就是用补偿触发事件和边界补偿捕获事件完成的。当补偿触发后,activiti会从后往前依次触发补偿捕获事件。

  1. 画一个补偿触发事件
  2. 画一个边界补偿捕获事件
    上图的意思是当流程走到静默期这个接收任务的时候会卡住并触发一个边界定时事件,如果定时到了就会走机器下线流程结束,如果定时没到却触发了静默期消息就会走补偿触发事件,触发应用启动,回滚成功。

问题很多慢慢解

任务怎么没记录

说到这个问题,就要提及异步任务:

  • async:标识任务是不是异步执行的,如果为true表示任务不是jobExecutor直接执行,而是生成额外的job放到工作线程池由工作线程执行。

借用官方的图来简要说明一下。activiti是通过事务的方式执行流程,当流程开始后,activit会一直推进流程直到遇到等待状态,然后把当前状态存到数据库,并等待下一次触发。如下图中usertask是第一个等待状态,定时器是第二个等待状态,而完成usertask和执行服务任务在同一个工作单元中,同一个工作单元中的成功和失败是原子的。如果服务任务成功了数据库中看到的是完成的usertask和完成的服务任务,如果服务任务出错了就会导致activti回滚事务,这个时候就会回到最初的状态,现象就是数据库中依然只有未完成的usertask的记录,而看不到服务任务的记录。

如果我想看到服务任务的执行状态呢,解法就是设置async为true,将服务任务交给后台执行,后台的JobExecutor会周期性的扫描数据库的job提交给工作线程池。如下图所示,服务任务设置了async为true,这个时候有三个等待状态,第一个是usertask,第二个是服务任务,第三个是定时器。现在流程开始,usertask完成后会创建一个等待状态的服务任务job并把他保存到数据库,然后由工作线程池执行,如果服务任务job执行失败了,这个job仍然在数据库中。而邮件任务如果失败了则会和之前一样回滚到服务任务初始状态。

为啥job不跑了

这也是经常遇到的一个坑,目前还没有解决,这涉及到排他任务。

  • exclusive:表示任务是不是排他任务,如果为true,工作线程会把同一个流程实例的排他任务一次全部拿出来顺序执行。

还是借用官方的图,假设有如下场景,三个服务任务都是异步的,jobExecutor会把这3个job分给工作线程池来执行,如果他们同时到达并发汇聚网关那就会出现一致性问题,因为每个分支都会检查其他分支是否到达就会出现一直等待的情况。

为了解决这个问题,activiti使用了乐观锁,当所有分支汇聚时他们会去更新流程实例的版本号,如果有分支更新失败则会被锁上一段时间然后重试。但是乐观锁的问题在于失败的job默认只重试三次而且job是非事务的会导致数据重复。因此又出现了排他任务,exclusive为true时保证activiti对于同一个流程实例不会同时执行两个排他任务,也就是说async和exclusive同时为true时并不是真正的并行,而6.0 exclusive默认为true。

按道理只有async为true,exclusive为false时才会job锁住不跑了,但是事实并不是这样,因此我怀疑是当多台机器同时获取job时导致的这个问题,还待后续排查。

后记

洋洋洒洒终于把activiti的总结写完了,不过是点到为止,还有很多功能没有尝试,bug也没查出来。革命尚未成功,同志仍需努力。