枯燥的Kotlin协程三部曲(中)——应用实战篇

5,756 阅读20分钟

0x0、引言

上节《枯燥的Kotlin协程三部曲(上)——概念启蒙篇》,追根溯源,先了解并发相关的概念,尔后引出Kotlin协程:

真正的协程

  • 一种 非抢占式 / 协作式任务调度模式,程序可 主动挂起或恢复执行
  • 基于线程,相对于线程轻量很多,可理解为 用户层 模拟线程操作;
  • 上下文切换由用户去控制,避免大量中断参与,减少线程上下文切换与调度消耗的资源;

Kotlin中的「假协程

语言级别并没有实现一种 同步机制(锁),还是依靠Kotlin-JVM提供的Java关键字,锁的实现还是交给线程处理,因此Kotlin协程本质上只是一套「基于原生Java Thread API的封装」。只是这套API 隐藏了异步实现细节,让我们可以用「同步的方法来写异步操作」罢了。

关于Kotlin中的协程的分类有些争议:

没有分配函数调用栈,挂起点状态通过 状态机或闭包等语法实现,是 无栈协程 的无误, 但它又表现出 有栈协程 的特点:可在任意函数调用层及挂起,并转移调用权。

重要概念大概就这些,在学习Kotlin协程的具体API前,容笔者再给大家做些大有裨益的点思想工作。

0x1、思想工作

① 上下文环境


上下文环境 这个词你是怎么理解的?笔者的理解:完成某件事物时所需的前置资源。举个例子:

你周末早上起来,突然想吃韭菜猪肉饺,去市场买了饺子皮、韭菜和五花肉,把韭菜切好,肉绞好,准备开始 包饺子;突然基友夺命call,喊你出去 喝奶茶打王者,那么快乐的事情,怎么能不去。

不过材料都准备好了,晾着等变质或直接丢掉,显然不合理,咋能这样糟蹋粮食! 可以把材料放冰箱,浪完回来再拿出来继续包,从例子中提炼一些东西:

  • 1、包饺子和喝奶茶打王者,是两件 事物
  • 2、做包饺子这件事物的 前置条件 是准备好饺子皮,韭菜和肉馅这些材料(没有材料,包空气?)
  • 3、换句话说包饺子这件事依赖一个 外部的环境(Context),又称 上下文环境
  • 4、此时,基友喊你喝奶茶打王者,你需要停止包饺子这件事物;
  • 5、暂停包饺子这件事物 和 把材料放冰箱里,这叫 挂起(suspend)保存上下文环境
  • 6、浪完回来了继续包饺子 和 把冰箱的材料拿出来,这叫 恢复(resume)恢复上下文环境

注意

这里的 挂起和恢复,是你的 主动行为,要和 堵塞 区分开来,堵塞是 被动行为,比如煮饺子,饺子包好了,但水还没烧开,此时只能 等待

从例子里我们知道了,上下文环境完成某件事务所需的前置资源,我再举一个例子:

你突然有了三个女友,她们也喜欢吃饺子,不过各有所好,大桥未久喜欢玉米、三上悠亚喜欢白菜、桥本有菜喜欢香菜。

所以,在包韭菜馅的饺子时,你会把另外三种馅也包好,此时的事务变成四个:

包韭菜饺子、包玉米饺子、包白菜饺子、包香菜饺子

基友喊你奶茶王者,需要把这四种馅都放冰箱(挂起),问题来了,该怎么放?

直接一把梭往冰箱里怼,肯定是不好的,不好拿是其次,最怕弄乱,毕竟以后你还会有 高橋聖子山岸逢花仲村美羽小野六花松下紗榮子石原希望葵司 这些女友,是吧?一个简单的解决方法:用一个大袋子把对应的资源分别装好,而后贴个 写有女友名字的便利贴 以示区分,就不怕弄乱了~

我们通过写有女友名字的便利贴(不变性) 对 馅料资源 进行标记,以此保证了上下文环境的 唯一性。 对事物进行抽取,包饺子上下文环境如下:

女友名字的便利贴 + 肉馅 + 素馅 + 其他东西

扩展到 线程切换上下文环境,亦是如此:

线程Id + 线程状态 + 堆栈 + 寄存器状态等

扩展到Kotlin协程中的上下文环境 → CoroutineContext,以 键值对 的方式存储各种不同元素:

Job(协程唯一标识) + CoroutineDispatcher(调度器) + ContinuationInterceptor(拦截器) + CoroutineName(协程名称,一般调试时设置)

妙啊~


② 结构化并发


了解完 上下文 是完成某项事务所需的 外部环境 后,我们再来说说 结构化并发,我们都知道:

多个线程间没有 级联关系,线程执行的上下文是整个进程,并发相对整个进程而非某个父线程。

这是 线程的非结构化,而从业务的角度看:

每个并发操作都是在处理一个任务,它可能属于某个父任务,也可能有自己的子任务。每个任务拥有自己的生命周期,子任务的生命周期理应继承父任务的生命周期。

这是 业务的结构化,Kotlin中的协程就是 结构化的并发

在实际开发中,我们很少需要一个全局的协程,因为它总是跟程序中某个局部作用域相关,这个局部作用域是一个生命周期有限的实体,比如某次网络加载,新建的协程对象和父协程保持着「级联关系」。

具体表现

协程必须在作用域中才能启动,作用域中定义了一些父子协程的规则,Kotlin协程通过作用域来管控域中的所有协程。

作用域间可并列包含,组成一个树状结构,这就是Kotlin协程中的结构化并发,说下规则:

先是作用域细分,有下述三种:

  • 顶级作用域:没有父协程的协程所在的作用域;
  • 协同作用域:协程中启动新协程(子协程),此时子协程所在的作用域默认为协同作用域,子协程抛出的未捕获异常都将传递给父协程处理,父协程同时也会被取消;
  • 主从作用域:与协同作用域父子关系一致,区别在于子协程出现未捕获异常时不会向上传递给父协程。

再接着是父子协程间的规则

  • 父协程被取消,所有子协程均被取消;
  • 父协程需等待子协程执行完毕后才会最终进入完成状态,而不管父协程本身的协程体是否已执行完;
  • 子协程会继承父协程上下文中的元素,如果自身有相同Key的成员,则覆盖对应Key,覆盖效果仅在自身范围内有效。

③ 协作式取消


《Kotlin协程官方指南》中说到:取消是协作的,啥意思? 线程的取消和协程的取消在原理上很类似,先从线程的取消说起,再过渡到协程的取消。

如果取消线程

读者的第一反应是不是调用Thread实例的 stop() 或 suspend(),可以是可以,不过不建议这样做。 前者过于粗暴,线程终止前没有对其做任何清除操作,具有固有的不安全性,而后者则具有固有的死锁倾向。

一种更好的方式

在自己的Thread类中置入一个 标志-isAlive(),用于控制目标线程是活动还是停止。 如果该标志指示它要停止运行,可使其结束run()方法; 如果目标线程等待时间过长,则应使用interrupt()方法来中断该等待;

过渡到Kotlin协程,亦是如此,只是:

标志位isAlive() → isActive,中断方法interrput() → cancel()

就是这么简单,具体的取消操作详解,Job那里会讲解一波~


0x2、添加依赖

Kotlin标准库stdlib 中是不包含 Kotlin协程 的,需要 按需另外导入,如:

// 核心库:必要!公共API,使得协程在各个平台的接口得到统一
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.8"

// 平台库:当前平台对应的平台库,协程在具体平台的表现方式是有差异的(如Android)
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.8'

// 测试库:协程的测试库,方便开发者在测试中使用协程
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.8'

库的版本号必须一致,比如这里同为1.3.8,最新版本号及更多库信息可移步至官方Github仓库自行查阅: github.com/Kotlin/kotl…

:IDEA使用Maven方式导入Kotlin协程依赖:

依次点击:File → Project Structure → Modules → + → Library... → From Maven...

输入库的关键词,点击OK,然后等待下载完毕即可。


0x3、第一个官方Demo的解读

如题,官方文档中给出了第一个Kotlin协程Demo:Your first coroutine,笔者加了点料:

运行输出结果如下

讲解一波(有些名词不懂也没关系,不影响后续学习):

上节说过,Kotlin-JVM的协程是 假协程,只是对底层Thread的一次良好封装,这里通过 Thread.currentThread().name 把当前线程的名字打印出来,可以看到协程所用的线程名为: DefaultDispatcher-worker-1,盲猜 线程池,毕竟高效的多线程调度基本是离不开线程池。

GlobalScope.launch 先简单地理解为创建了一个协程,第12行比第8行先执行的原因: 创建线程池要费点时间,所以没主线程同步代码的执行速度快。

delay() 是一个 挂起函数(suspend function),可在不堵塞线程的情况下延迟协程; Thread.sleep() 则会堵塞当前线程;

suspend 挂起的意思:协程作用域被挂起,但当前 线程中协程作用域外的代码不被堵塞suspend挂起函数,只能在协程或者另外一个挂起函数中被调用。

在协程挂起(等待)时,线程会回到线程池,当等待结束,协程会从线程池中一个空闲的线程上恢复。

读者比较疑惑的问题可能是:第13行的那一句 Thread.sleep(2000L) 能去掉吗?

答:不行,这句话的作用是 堵塞主线程,JVM保活,好让协程执行完毕,如果去掉,协程里的东西没执行完,JVM就退出了。

另外再说一点:

main线程只是一个普通的用户线程,其他线程都是由main线程启动的,但在进程层面看:线程都是平级的,没有父子关系,JVM会在所有用户线程执行完毕后退出,注意是 用户线程!JVM可不会理 守护线程 生还是死,可以通过 setDaemon(true) 将线程设置为守护线程。

而打开Kotlin协程源码,全局搜下:isDaemon = true,可见一斑:

① runBlocking

另外,Kotlin还提供了一种机制来堵塞线程,可实现与Thread.sleep相同的效果,修改后的代码:

背后的机制:

runBlocking函数会建立一个 堵塞当前线程的协程,main线程会等待runBlocking中的代码执行完毕。

上述代码还可以再改进一波:

runBlocking是一个全局函数,可在任意地方调用,不过 项目中用得不多,毕竟堵塞main线程意义不大,常用于单元测试防止JVM退出。

还有一点,使用delay()函数可以起到延迟等待作用,但并非良策,实际开发中耗时任务的时间存在不确定性,可以使用Kotlin协程提供的Job(作业)来实现,这个等下会讲。


0x4、CoroutineScope → 协程作用域

① GlobalScope → 全局协程作用域

点进 GlobalScope 的源码:

定义成了一个 单例对象, 在整个JVM虚拟中只有一份对象实例,生命周期贯穿整个JVM,故使用时需要警惕 内存泄漏!!!上面讲过Kotlin协程通过作用域来实现「结构化并发」的需求,可以自定义协程作用域以满足我们的需求。

② 自定义作用域

GlobalScope继承自CoroutineScope接口,点开源码,比较简单,持有一个CoroutineContext上下文

可以让类实现这个接口,让该类称为一个协程作用域,示例如下:

2、使用MainScope()函数

为了在Android/JavaFx等场景中更方便的使用,官方提供了 MainScope() 函数快速创建基于主线程协程作用域。

使用MainScope可以很方便的控制所有它范围内的协程的取消,官方更推荐我们定义一个抽象的Activity,示例如下:

3、使用 coroutineScope()supervisorScope() 创建子作用域

注意,是创建子作用域,只能在一个已有的协程作用域中调用,前者出现异常时会把异常抛出(父协程及其他子协程会被取消),后者出现异常时不会影响其他子协程,示例如下:


0x5、创建协程 → 作用域函数

协程作用域,确定了协程间的父子关系,以及取消或异常处理等方面的传播行为。接着,可以利用作用域函数来创建协程。

① launch & async

这两个函数会创建一个「不堵塞」当前线程的新协程,区别:

  • launch返回一个「Job」,用于协程监督与取消,用于无返回值的场景。
  • async返回一个Job的子类「Deferred」,可通过await()获取完成时返回值。

简单的代码使用示例如下:

输出结果如下


0x6、suspend关键字 → 挂起函数

Kotlin协程提供了 suspend 关键字,用于定义一个 挂起函数,它就是一个 标记

当你写的普通函数需要在「某些时刻挂起和恢复」,加上他就行,其他不用你理!!!

而它的真正作用:

告知编译器,这个函数需在协程中执行,编译器会将挂起函数用「有限状态机」转换为一种优化版的回调。

抽取一波业务代码,用suspend定义挂起函数,修改后的代码如下:


0x7、Job → 作业

调用launch函数会返回一个Job对象,代表一个 协程的工作任务


① 常用API

/**
 * 协程状态
 */
isActive: Boolean    //是否存活
isCancelled: Boolean //是否取消
isCompleted: Boolean //是否完成
children: Sequence<Job> // 所有子作业

/**
 * 协程控制
 */
cancel()             // 取消协程
join()               // 堵塞当前线程直到协程执行完毕
cancelAndJoin()      // 两者结合,取消并等待协程完成
cancelChildren()     // 取消所有子协程,可传入CancellationException作为取消原因
attachChild(child: ChildJob) // 附加一个子协程到当前协程上

② 生命周期

Job的生命周期包括一系列的状态:

New(新创建)、Active(活跃)、Completing(完成中)、 Completed(已完成)、Cancelling(取消中)、Cancelled(已取消)

注意上图中的 await children,当协程处于完成中 或取消中,会等待所有子协程完成后,才进入已完成或已取消状态。


③ 取消操作详解

  • 取消作用域会取消它的所有子协程;
  • 同一作用域中,被取消的子协程不会影响其余兄弟协程;
  • 协程通过抛出一个特殊的异常CancellationException来处理取消操作,cancel函数中默认会创建一个,也可以自己构建新的实例传入,子协程因为CancellationException而被取消,父协程是不需要进行其他额外操作的;
  • 不能在已取消的作用域中再次启动新的协程;
  • 协程的取消是「协作式」的,协程不会在调用cancel()时立即停止,调用后只是进入 取消中 状态,只有工作完成后才会变成 已取消 状态,所以需要我们在代码中定期检查协程是否处于活动状态。比如下述例子:

运行输出结果如下

并没有在取消后立即停止,需要我们自己手动去判断,比如在while()循环中加入**isActive**

while(i < 5 && isActive)

也可以使用**ensureActive()来检查,该函数会在Job处于不活跃状时立即抛出异常,可以把它写在循环体的第一行。 还可以使用yield()**来检查,它的第一个操作就是检查Job是否完成,已完成会抛出CancellationException来退出协程。

协程取消后会抛出CancellationException,可以使用try/catch代码块对此异常进行捕获,在finally块中完成资源释放等清理工作。

但要注意一个问题:处于取消中状态的协程不能够挂起,finally中的代码如果涉及挂起,后续代码是不会继续执行的,可以通过 withContext + NonCancellable 来创建一个无法取消的任务,以保证清理任务的完成,示例代码如下:

运行结果如下


④ 异常处理

Kotlin中异常处理的玩法有三种,先是「try-catch直接捕获作用域内异常」,代码示例如下:

输出结果如下

:无法使用try-catch去捕获launch和async作用域的异常!!!

然后是「全局异常处理」跟Rx里的 RxJavaPlugins.setErrorHandler 捕获全局异常很相似,而全局协程作用域存在嵌套子父级关系,所以异常可能会依次抛出多个,代码示例如下:

输出结果如下

:只支持launch()传入,async()传入是无效的;全局异常处理并不能阻止协程取消,只是避免因异常而退出程序。

最后是「异常传播」,协程作用域中异常传播默认是 双向 的表现为:

  • 父协程发生异常,所有子协程都会取消;
  • 子协程发生异常,会导致父协程取消,间接导致兄弟协程也取消;

代码示例如下:

输出结果如下:

有两种方式将传播变为 单向,即子协程发生异常不会影响父协程及兄弟协程。 其中一种方式就是用 SupervisorJob 代替 Job,修改后的代码示例:

另一种是使用上面自定义作用域介绍的 supervisorScope,修改后的代码示例如下:

相同的运行结果:


⑤ 启动模式


launch & async那里截了launch和async的源码,关注第二个参数 CoroutineStart,点进源码:

public enum class CoroutineStart {
    // 默认,创建后立即开始调度,调度前被取消,直接进入取消响应状态。
    DEFAULT,
    
    // 懒加载,不会立即开始调度,需要手动调用start、join或await才会
    // 开始调度,如果调度前就被取消,协程将直接进入异常结束状态。
    LAZY,
    
    // 和Default类似,立即开始调度,在执行到一个挂起函数前不响应取消。
    // 涉及到cancle才有意义
    @ExperimentalCoroutinesApi
    ATOMIC,
    
    // 直接在当前线程执行协程体,直到遇到第一个挂起函数,才会调度到
    // 指定调度器所在的线程上执行
    @ExperimentalCoroutinesApi
    UNDISPATCHED;
}

0x8、调度器 → CoroutineDispatcher

① 四类调度器

Kotlin协程预置4种调度器,如下表所示:

种类描述
Default默认,线程池,适合处理后台计算,CPU密集型任务调度器
IOIO调度器,适合执行IO相关操作,IO密集型任务调度器
MainUI调度器,根据平台不同会初始化为对应UI线程的调度器,如Android的主线程
Unconfined不指定线程,如果子协程切换线程,接下来的代码也在该线程继续执行

另外,调度器还提供了一个属性**immediate**,如果当前处于该调度器中,不执行调度器切换直接执行。对比示例如下:

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 第一个执行
}
// 第二个执行
CoroutineScope(Job() + Dispatchers.Main).launch {
	// 调度器切换,导致慢一些所以第四个执行
}
// 第三个执行

② withContext

和launch、async及runBlocking不同,withContext不会创建新的协程,常用于 切换代码执行所运行的线程。 它也是一个挂起方法,直到结束返回结果。多个withContext是串行执行的, 所以很适合那种一个任务依赖上一个任务返回结果的情况,比如:

就很舒服,使用async+await可以是想同样的效果,只是需要创建了三个协程,有些多余:


0x9、拦截器 → ContinuationInterceptor

见名知意,就是用来拦截协程做一些 附加操作 的,比如上面的调度器,就用拦截器实现的。 写个拦截打日志的Demo试试水,这里的 Continuation 是用来保存协程挂起状态与局部变量的对象。

运行结果如下:

发生了4次拦截,依次是:协程启动时(前两个),挂起时返回结果时。 我们可以在写个简易版的线程调度器,让协程在启动时完成线程切换,示例如下:

运行结果如下:

可以看到,协程切换到自定义的线程池运行,配合withContext,运行完后又切回来了。


0x10、Deferred

看下Deferred的源码:

在继承Job的基础上,指定了<out T> 输出泛型,await()挂起协程并返回最后的执行结果。


0x11、Channel → 通道

与Java中用于解决多线程数据传递的BlockingQueue类似,Kotlin协程提供了 Channel, 用于解决多协程间的数据传递。元素从一端被加入,从另一端被消费,除了堵塞操作外, Channel还提供了非堵塞的send及receive操作。一个简单的使用代码示例如下:

另外,Receiver端支持用for迭代来接收消息,比如:for(c in channel) 也是可以的。

:用完Channel记得调用close()关闭通道,否则从Channel读取数据的协程都会无限挂起,在那里等数据传过来!!!

① 不同的Channel类型

打开Channel的源码:

继承了SendChannelReceiveChannel,然后定义了几个代表Channel类型的常量:

RENDEZVOUS → 默认,0缓存,创建了一个RendezvousChannel,send就挂起,直到被receive;
UNLIMITED → 创建了一个LinkedListChannel,无限容量,send不会挂起;
BUFFERED → 指定大小,创建了一个ArrayChannel,满了send会挂起;
CONFLATED → 创建了一个ConflatedChannel,新send的会覆盖之前send的,receiver只会得到最新的,send不会挂起;

// 创建不同类型Channel示例:
val rendezvousChannel = Channel<String>()
val unlimitedChannel = Channel<String>(UNLIMITED)
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)

② SendChannel & ReceiveChannel

SendChannel定义了往通道发送数据的接口,ReceiveChannel定义了从通道接收数据的接口,简单过下API:

/* === SendChannel === */

isClosedForSend: Boolean // 判断通道是否关闭,关闭了再发送数据会抛异常
isFull: Boolean          // 通道是否已满
close(cause: Throwable? = null): Boolean // 关闭通道
send(element: E)               // 挂起函数,通道满该函数会先暂停执行,如果通道关闭会抛异常
offer(element: E): Boolean     // 同步发送一个数据,通道满或关闭无法添加成功,返回false或抛异常
invokeOnClose(handler: (cause: Throwable?) -> Unit) // 通道关闭执行回调
onSend: SelectClause2<E, SendChannel<E>> //立即发送数据(如果允许), 在select表达式中使用

/* === ReceiveChannel === */
isClosedForReceive: Boolean // 判断通道是否关闭,关闭后还有缓存则会接收完再返回false
isEmpty: Boolean            // 通道是否为空
cancel(cause: CancellationException? = null)    // 关闭通道
receive(): E                // 挂起函数,接收数据,如果通道关闭会抛异常
receiveOrClosed(): ValueOrClosed<E> // 同上,只是通道关闭了不会抛异常,而是返回null
poll(): E                   // 从通道获取并删除一个数据,如果为空返回null,如果出错则抛出异常
onReceive: SelectClause1<E> // 用于select表达式

Channel、Flow以及实际应用后续补上,待续...


参考文献