异步化组件的场景选型及实现

avatar
@阿里巴巴集团

作者:闲鱼技术-码宝

前景概要

在我们日常的开发过程中经常会碰到无血缘关系的流水账逻辑(数据补全, 通知逻辑等), 这个时候我们通常会采用异步化的方式去处理从而加快响应速度. 与此同时, 伴随着上下游依赖的服务变多, 对应的可能也会产生一系列的问题包括不限于问题难以排查, 性能难以保证等. 在闲鱼也不例外:

  • 补齐的串行操作(这里指串行去实现多次网络io)存在性能瓶颈, 严重影响到了接口rt.
  • 逻辑单元化程度不好, 逻辑"自由飞翔"导致代码可读性较差, 单元逻辑相关指标也难以衡量.

基于上述的描述, 我们想就此抽象出一个异步化组件去解决对应的问题. 那先给自己定几个小目标吧!

  1. 可监控: 每个单元逻辑生命周期内的所有状态和行为(成功, 失败, rt等关键指标)都在监控范围内.
  2. 容灾性: 有fallback机制, 当逻辑单元内出现大量异常(通常指的是超时)的时候能堪大用.
  3. 零成本接入: 接入使用方便, 做到拆箱即用.

举个栗子

不超时场景

假设逻辑单元L有a, b, c三个任务, 执行时长分别为t1=1, t2=2, t3=3, 超时时间为4. 如图所示:

现在希望的结果是:

  1. 该逻辑单元L执行时间为3
  2. a任务执行成功1次耗时为1, b任务执行成功1次耗时为2, c任务执行成功1次耗时为3

超时场景

假设假设逻辑单元L有a, b, c三个任务, 执行时长分别为t1=3, t2=5, t3=6, 超时时间为4. 如图所示:

现在希望的结果是:

  1. 该逻辑单元L执行时间为4
  2. a任务执行成功1次耗时为3, b任务执行失败, c任务执行失败
  3. b和c线程在失败后线程停止, 不继续占用线程池资源

方案

注意下面所有的方案都是围绕超时场景展开

方案之akka

  1. 当前服务获取对应的LogicActor
  2. LogicActor获取当前的所有UnitActor(a, b, c)
  3. 使用tell方式分别向a, b, c发消息, 注意这个过程是异步的
  4. a正常返回, b和c均超时
  5. 不管超时与否LogicActor都会去countDown当前的任务
  6. 当所有任务都完成的时候就回去combine数据
  7. 最后返回结果数据

LogicActor-逻辑单元actor, 负责分发任务和合并数据

UnitActor-具体任务的actor

优点

  1. 消息驱动.
  2. 不需要额外的线程池管理 & 异常容错.
  3. 优雅停止正在执行的worker(PoinsonPill).
  4. 熟悉akka的人可以较为容易的上手.

缺点

  1. 为了达到目标需要一层封装, 并且也不容易.
  2. 大大增加系统复杂度.
  3. 对于不熟悉scala/akka的人来说简直是灾难.

scala/akka的学习成本直接导致了"零成本接入"的目标无法达成, 故放弃.

方案之rxjava

使用rxjava来实现的逻辑

  1. 定义一个CountDownLatch来做一个超时控制
  2. 定义三个task
  3. 通过rxjava去实现异步化处理
  4. 给一个固定大小为10的线程池去处理
  5. reduce中去做累加操作
  6. latch做一个统一卡点

这边的timeout起到了整体卡点的效果, 但是并不知道timeout是哪个业务导致的

优点

  1. 响应式编程
  2. 代码量精简
  3. 屏蔽线程池的操作
  4. 封装了#timeout#onErrorReturn方法, 已存在超时处理模块无需二度封装

缺点

  1. 为了达到目标需要一层封装
  2. 存在超时处理模块, 但是业务单元的植入会破坏rxjava原有的封装且同时改造难度较大

虽然满足"容灾性"(onErrorReturn), 但是无法做到"可监控", 因为超时之后我们没有办法知道是哪个业务超时了, 故放弃.

完整方案-基于JUC包下的封装

难点

如何实现逻辑单元的监控
  1. 集团内部针对全局的traceId进行封装(ThreadLocal), 使用线程池会丢失上下文, 导致对应的日志无法被追踪到.
  2. 存在bizCode概念, 方便监控逻辑单元以及逻辑单元内的每一个操作.
如何清理超时线程, 使其不占用线程池资源

上述CallableRunnable执行超时时, 需要将其停止, 让其不继续占用线程池资源资源.

并发(超时)如何控制

控制超时的工具类有很多

  • CountDownLatch: java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)超时不会抛出异常只能通过boolean去判断是否超时, 意味着不能通过短路的异常流方式去处理.
  • CyclicBarrier: java.util.concurrent.CyclicBarrier#await(long, java.util.concurrent.TimeUnit)对于异常的封装并不友好, 例
    • 现在有四个线程a, b, c, d. 超时时间均2s. 其中d线程执行了3s, 此时会发生下面的情况
      1. 第一个完成任务进入barrier.await的会抛出timeout异常, 同时其他四个任务都会抛出broken barrier
      2. 超时任务最后会完成后续的动作, 持续占用资源
  • Semaphore: 不满足使用条件, 不过多赘述.
  • Future: java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit), futureList会串行读取, 导致T>=Max(t1,t2,t3)

一个典型的错误例子

  1. 提交future到对应的list中
  2. 通过遍历去获取对应的结果
  3. 获取到结果之后去做累加的操作
  4. 最后输出结果和执行的时间

虽然上面三个任务是并发执行, 但由于future.get(long, TimeUnit)是阻塞, timeout会在此处失效. 如上图, 最后耗时可能是3+2+1=6s.

整体方案

核心类图如下:
callable类UML图

  1. ConcurrentCallable是对外使用的类
  2. BizBaseCallable继承了Callable, 封装了内部的上下文和对应的业务单元
  3. BizCountDownLatch为了解决业务超时监控的问题, 加上了业务单元池的属性
  4. BizBaseCallable封装了ctx表示上下文, 保证在多线程的情况下上下文也不会丢失(针对集团内部的traceId的兼容)
CountDownLatch的改造-可监控

继承CountDownLatch重写#await()和countDown()方法

  1. 补充业务单元概念(bizCode)
  2. 对应的bizSet实际上是一个未完成的业务池子, 完成一个业务去掉对应的业务单元
  3. 重写countDown方法, 通过移除业务单元去表示已完成
  4. await超时的时候抛出异常, 模拟短路环境, 上层统一处理
  5. 超时异常中丢出还未完成的任务

上层监控逻辑封装

ConcurrentCallable的超时处理-容灾性

  1. 监控逻辑中包含了超时逻辑的处理
  2. Callable获取返回值的时候, get超时时长给0: 该完成的都完成了, 没完成的就超时了.
  3. 通过future.cancel去中断正在执行的callable
  4. 默认返回fallback(兜底操作), fallback默认实现为renturn null;.
  5. 这里需要塞一个默认值, 这样可以方便外层调用的时候通过list.forEach(l -> deal(l))去处理列表逻辑
再看上面那个例子-零成本接入

  1. 模拟上下文的置入, TestCtx中封装了一个ThreadLocal
  2. 4000L表示该任务的超时时间为4000ms, 该逻辑单元取名为"l"
  3. 往组件中添加任务a, b, c
  4. 最后展示了获取到数据之后的处理

  1. a结束后会在countDownLatch中移除对应的单元
  2. 超时的时候会强行结束b和c

至此就完成了一个可监控, 容灾性, 零成本接入的异步化组件.

小结

综上所述最后选择了"JUC(升级版)"作为我们的异步化组件.

本文主要介绍了闲鱼内部使用的异步化调度的组件. 业务场景衍生出的业务组件可能不具备很强的普适性, 但是希望从文中的背景, 目标, 方案选型中给出值得借鉴的思考.