阅读 677

Kotlin 协程探寻

一、协程指南

Kotlin 是一门仅在标准库中提供最基本底层 API 以便各种其他库能够利用协程的语言。与许多其他具有类似功能的语言不同,async 与 await 在 Kotlin 中并不是关键字,甚至都不是标准库的一部分。此外,Kotlin 的 挂起函数 概念为异步操作提供了比 future 与 promise 更安全、更不易出错的抽象。

说起来很牛掰,但是在学习前我很担心协程会像rxjava一样显得鸡肋,因为感觉使用的人数很少,并没有大规模普及开来,以后协作的时候会遇到困难,但是看了一下网上的几个介绍以后,我还是决心学习一番,先看代码后解释:

  • 协程很轻量

import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(100_000) { // 启动大量的协程
        launch {
            delay(1000L)
            print(".")
        }
    }
}
复制代码
  • 取消协程的执行

val job = launch {
    repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
        delay(500L)
    }
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该作业
job.join() // 等待作业执行结束
println("main: Now I can quit.")
复制代码
  • 协程的超时

withTimeout(1300L) {
    repeat(1000) { i ->
            println("I'm sleeping $i ...")
        delay(500L)
    }
}
复制代码
  • 协程—— async 并发

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了一些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了一些有用的事
    return 29
}
val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
复制代码

看了上面的示例,我发现使用协程可以代码让质量更好,虽然不能排除上面说的鸡肋,但是通过一个礼拜时间来学习一门新的技能,除了学习协程本身以外,这种保持学习的饥饿和培养自学的能力也是一笔不小的收获! 下面是我的学习笔记,如果大家学习的话建议直接看官方文档:点击这里

二、协程知识点

协程的知识点不多,除了指南这个概述以外,一共分为如下 8 个章节,但是我想从一共示例开始,结果 6 行示例Kotlin代码转成Java代码有 16112 行(代码太长,截屏的话不是很清楚,有兴趣的话自己试一下),看到这个体量以后,我决定还是按照官方文档一个一个概念来捋,这样会清晰一些!

 GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
        println("World!") // 在延迟后打印输出
    }
    println("Hello,") // 协程已在等待时主线程还在继续
    Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
复制代码

目录:

协程基础

取消与超时

组合挂起函数

协程上下文与调度器

异常处理与监督

通道(实验性的)

共享的可变状态与并发

Select 表达式(实验性的)

1.协程基础

桥接阻塞与非阻塞的世界

  • delay非阻塞
  • runBlocking 阻塞

阻塞与非阻塞关注的是交互双方是否可以弹性工作。假设对象 A 和对象 B 进行交互,而对象 B 对一个问题需要思考一段时间才能回复 A,那么对象 A 可以选择等待对象 B 回复,这种方式就是一种阻塞式交互,与此同时,对象 A 可以选择在对象 B 进行思考的时间去完成别的工作,等到对象 B 完成思考后再进行后续交互,这种方式就是一种非阻塞式的交互。

一般来说,阻塞与非阻塞式用来形容 CPU 消耗的。我们把 CPU 停下来等待慢操作完成以后再接着工作称为阻塞;把 CPU 在慢操作完成之前去完成其他工作,等慢操作完成后再接着工作称为非阻塞。

  • 协程与子协程
 @Test
    fun addition_isCorrect()  = runBlocking<Unit>{
        val job = GlobalScope.launch { // 启动一个新协程并保持对这个作业的引用
            delay(1000L)
            println("World!")
        }
        println("Hello,")
        job.join() // 等待直到子协程执行结束
    }
复制代码

结构化的并发

我们可以在执行操作所在的指定作用域内启动协程, 而不是像通常使用线程(线程总是全局的)那样在 GlobalScope 中启动。

白话就是协程的启动要有规范,不然一样有内存泄露的风险,但是怎么规范起来呢?就是在指定的区域启动协程,而且还要约束启动协程的方式。

  @Test
    fun addition_isCorrect()  = runBlocking{
        launch { // 启动一个新协程并保持对这个作业的引用
            delay(1000L)
            println("World!")
        }
        println("Hello,")

    }
复制代码

作用域构建器

白话就是结构化的并发时,作用域的指定有两种方式,一种是阻塞式的,一种是非阻塞式的,示例代码如下:

 fun addition_isCorrect()  = runBlocking{
        launch {
            delay(200L)
            println("Task from runBlocking")
        }

        coroutineScope { // 创建一个协程作用域
            launch {
                delay(500L)
                println("Task from nested launch")
            }

            delay(100L)
            println("Task from coroutine scope") // 这一行会在内嵌 launch 之前输出
        }

        println("Coroutine scope is over") // 这一行在内嵌 launch 执行完毕后才输出

    }
复制代码

提取函数重构

这个和dart语言很接近,就是在异步的方法中,将部分代码提出来的时候需要添加一个关键字suspend,告诉代码这个是异步方法,具体的话看代码一目了然:

   @Test
    fun addition_isCorrect()  = runBlocking{
        launch { doWorld() }
        println("Hello,")

    }

    // 这是你的第一个挂起函数
    suspend fun doWorld() {
        delay(1000L)
        println("World!")
    }
复制代码

注意: 但是如果提取出的函数包含一个在当前作用域中调用的协程构建器的话,该怎么办? 在这种情况下,所提取函数上只有 suspend 修饰符是不够的。为 CoroutineScope 写一个 doWorld 扩展方法是其中一种解决方案,但这可能并非总是适用,因为它并没有使 API 更加清晰。 惯用的解决方案是要么显式将 CoroutineScope 作为包含该函数的类的一个字段, 要么当外部类实现了 CoroutineScope 时隐式取得。 作为最后的手段,可以使用 CoroutineScope(coroutineContext),不过这种方法结构上不安全, 因为你不能再控制该方法执行的作用域。只有私有 API 才能使用这个构建器。

全局协程

启动一个长期运行的协程,该协程每秒输出“I'm sleeping”两次,之后在主函数中延迟一段时间后返回,例如获取验证码的倒计时: 此处主要是告诉我们,其实协程也可以像线程一样使用。

   @Test
    fun addition_isCorrect()  = runBlocking{
        GlobalScope.launch {
            var count = 5
            repeat(5) { i ->
                println("倒计时 ${count-i} 秒")
                delay(1000L)
            }
        }
        delay(5000L)

    }
复制代码

运行日志:

倒计时 5 秒
倒计时 4 秒
倒计时 3 秒
倒计时 2 秒
倒计时 1 秒
复制代码

这个时候如果我们要退出这个页面呢,当前的协程又该如何取消呢?这里就谈到了下一章节了。

协程的取消与超时

上面谈到协程的取消,话说当我们离开的时候该如何取消协程?我们知道后台ServicesThread的区别在于前者可以关闭线程,后者只能被动的等待被回收。这里我们的协程也能在部分情况下被取消(关闭线程的意思)的,先看看满足条件的情景:比如我们像上面这种情况的倒计时播放广告,但是当我们点击跳过的时候要离开当前页面,此时的协程我们要取消的,防止内存泄露。

取消协程的执行

这个时候可以这么写:

    @Test
    fun addition_isCorrect()  = runBlocking{
        val job = launch {
            var count = 5
            repeat(5) { i ->
                println("倒计时 ${count-i} 秒")
                delay(1000L)
            }
        }
        delay(3000L)
        job.cancel()
//        job.join()

    }
复制代码

运行结果:

倒计时 5 秒
倒计时 4 秒
倒计时 3 秒
复制代码

那么问题来了,上面是可被取消协程的情况,那么不可被取消的协程是什么样的呢?

 @Test
    fun addition_isCorrect()  = runBlocking{
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
                // 每秒打印消息两次
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }
        delay(1300L) // 等待一段时间
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // 取消一个作业并且等待它结束
        println("main: Now I can quit.")

    }
复制代码

运行结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.
复制代码

这里协程取消失败,因为我们在输入main: I'm tired of waiting!以后应该直接退出main: Now I can quit.,但是我们在命令退出以后还继续多运行了两个 500 毫秒之后才main: Now I can quit.。那么原因是什么啊?

如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的.

使计算代码可取消

  • 显示检查取消状态
@Test
    fun addition_isCorrect()  = runBlocking{
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5 && isActive) { // 一个执行计算的循环,只是为了占用 CPU
                // 每秒打印消息两次
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }
        delay(1300L) // 等待一段时间
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // 取消一个作业并且等待它结束
        println("main: Now I can quit.")

    }
复制代码

运行结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
复制代码

注意:

isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。

  • 定期调用挂起函数来检查取消 这种方法我没有理解也没有找到相关源码,请熟悉的大佬指点一二,官网文档:

suspend fun yield(): Unit (source) Yields a thread (or thread pool) of the current coroutine dispatcher to other coroutines to run. If the coroutine dispatcher does not have its own thread pool (like Dispatchers.Unconfined) then this function does nothing, but checks if the coroutine Job was completed. This suspending function is cancellable. If the Job of the current coroutine is cancelled or completed when this suspending function is invoked or while this function is waiting for dispatching, it resumes with CancellationException.

有道译文:

将当前协同程序调度程序的线程(或线程池)生成到要运行的其他协同程序。如果协调程序调度程序没有自己的线程池(如dispatcher . unrestricted),那么这个函数什么也不做,只检查是否完成了协调程序任务。这个挂起函数是可取消的。如果当前协同程序的作业在调用此挂起函数时被取消或完成,或者在此函数等待分派时被完成,则它将带CancellationException恢复。

取消协程释放资源

取消协程时候,应该在什么地方释放资源?答:在 finally 中释放资源

val job = launch {
    try {
        repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并且等待它结束
println("main: Now I can quit.")
复制代码

运行结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.
复制代码

运行不能取消的代码块

在前一个例子中任何尝试在 finally 块中调用挂起函数的行为都会抛出 CancellationException,因为这里持续运行的代码是可以被取消的。通常,这并不是一个问题,所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。然而,在真实的案例中,当你需要挂起一个被取消的协程,你可以将相应的代码包装在 withContext(NonCancellable) {……} 中,并使用 withContext 函数以及 NonCancellable上下文.

看了半天,最后才明白,释放资源的时候,如果关闭的协程需要异步操作,可以使用 withContext(NonCancellable) {……},具体如示例:

 @Test
    fun addition_isCorrect()  = runBlocking{

        val job = launch {
            try {
                repeat(1000) { i ->
                    println("job: I'm sleeping $i ...")
                    delay(500L)
                }
            } finally {
                withContext(NonCancellable) {
                    println("job: I'm running finally")
                    delay(1000L)
                    println("job: And I've just delayed for 1 sec because I'm non-cancellable")
                }
            }
        }
        delay(1300L) // 延迟一段时间
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // 取消该作业并等待它结束
        println("main: Now I can quit.")

    }
复制代码

运行结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.
复制代码

超时

既然可以取消一个协程,那么能不能在规定的时间内自动结束呢?答案是肯定的,超过这个时间了,还可以把当前的执行结果一并返回给你,代码如下:

   @Test
    fun addition_isCorrect()  = runBlocking{

        val result = withTimeoutOrNull(1300L) {
            repeat(5) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
            "Done" // 在它运行得到结果之前取消它
        }
        println("Result is $result")

    }
复制代码

运行结果:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
复制代码

通道

延期的值提供了一种便捷的方法使单个值在多个协程之间进行相互传输。 通道提供了一种在流中传输值的方法。

通道基础`

所谓通道基础,就是Channel在协程内作为通道可以从多个协程之间进行相互传输,示例如下:

 @Test
    fun addition_isCorrect()  = runBlocking{

        val channel = Channel<Int>()
        launch {
            // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
            for (x in 1..5) channel.send(x * x)
        }
// 这里我们打印了 5 次被接收的整数:
        repeat(5) { println(channel.receive()) }
        println("Done!")

    }
复制代码

执行结果:

1
4
9
16
25
Done!
复制代码

如示例,Channel在子协程内send了五次数据,在协程内重复五次receive。但是如果子协程send五次协程receive三次,或者子协程send三次协程receive五次,会怎么样呢? 答案是Channel会被挂起。 那么,怎么才能关闭Channel呢?

关闭与迭代通道

在实际项目中一般情况是channel在数据源中确认send次数是确定的,在channelreceive的次数无法确定,此时我们在数据源send完所有数据之后可以进行close方法,然后在通道的另一端进行迭代,当close之后迭代器就会进行自动停止。是不是有些绕?那就看代码吧!

    @Test
    fun addition_isCorrect()  = runBlocking{

        val channel = Channel<Int>()
        launch {
            // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
            for (x in 1..6) channel.send(x * x)
            channel.close()
        }
        for (y in channel){ println(y) }
        println("Done!")

    }
复制代码

执行结果:

1
4
9
16
25
36
Done!
复制代码

构建通道生产者

生产者——消费者 模式的一部分,并且经常能在并发的代码中看到它。 你可以将生产者抽象成一个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的常识相违悖。

转成人话就是说,可以把通道的生成方法以函数的方式创建,示例如下:

@Test
    fun addition_isCorrect()  = runBlocking{

        val squares = produceSquares()
        squares.consumeEach { println(it) }
        println("Done!")

    }

    // 协程构建函数
    fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
        for (x in 1..5) send(x * x)
    }
复制代码

运行结果:

1
4
9
16
25
Done!
复制代码

管道

无极生太极,太极生两仪,两仪生四象,四象生八卦,八卦生万象……不明白什么意思?看看官方文档:

管道是一种一个协程在流中开始生产可能无穷多个元素的模式,示例如下:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val numbers = produceNumbers() // 从 1 开始生产整数
        val squares = square(numbers) // 对整数做平方
        for (i in 1..5) println(squares.receive()) // 打印前 5 个数字
        println("Done!") // 我们的操作已经结束了
        coroutineContext.cancelChildren() // 取消子协程

    }

    fun CoroutineScope.produceNumbers() = produce<Int> {
        var x = 1
        while (true) send(x++) // 从 1 开始的无限的整数流
    }

    fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
        for (x in numbers) send(x * x)
    }
复制代码

执行结果:

1
4
9
16
25
Done!
复制代码

如果看不明白,那就再看一个示例:

    @Test
    fun addition_isCorrect()  = runBlocking{

        var cur = numbersFrom(2)
        for (i in 1..10) {
            val prime = cur.receive()
            println(prime)
            cur = filter(cur, prime)
        }
        coroutineContext.cancelChildren() // 取消所有的子协程来让主协程结束    

    }


    fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
        var x = start
        while (true) send(x++) // 从 start 开始过滤整数流
    }

    fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
        for (x in numbers) if (x % prime != 0) send(x)
    }
复制代码

执行结果:

2
3
5
7
11
13
17
19
23
29
复制代码

上面通道示例中的代码,当数据源太多了,我们仅需要一部分数据的时候,这个时候就可以如此处示例中这样,处理完需要的数据后cancelChildren

扇出

多个协程也许会接收相同的管道,在它们之间进行分布式工作。 所谓的扇出,类似于我们平时使用算法来获取具体的数据,而扇出就是这样的一种算法。先看看具体示例:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val producer = produceNumbers()
        repeat(5) { launchProcessor(it, producer) }
        delay(950)
        producer.cancel() // 取消协程生产者从而将它们全部杀死

    }


    fun CoroutineScope.produceNumbers() = produce<Int> {
        var x = 1 // start from 1
        while (true) {
            send(x++) // 产生下一个数字
            delay(100) // 等待 0.1 秒
        }
    }

    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
        for (msg in channel) {
            println("Processor #$id received $msg")
        }
    }
复制代码

执行结果:

Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10
复制代码

我们分析一下结果, 0 号子协程执行了3次,1号、2号、3号子协程执行了2次,4号子协程执行了1次,如果不是很清楚,我们接着看看扇入是什么样的一个结果?

扇入

多个协程可以发送到同一个通道。

直接看示例:

 @Test
    fun addition_isCorrect()  = runBlocking{

        val channel = Channel<String>()
        launch { sendString(channel, "foo", 200L) }
        launch { sendString(channel, "BAR!", 500L) }
        repeat(6) { // 接收前六个
            println(channel.receive())
        }
        coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束

    }


    suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
        while (true) {
            delay(time)
            channel.send(s)
        }
    }
复制代码

执行结果:

foo
foo
BAR!
foo
foo
BAR!
复制代码

此时我们再回头看看扇入和扇出,前者指的是一个管道输出,多个管道接收,后者恰恰相反,指的是多个管道输出,一个管道接收。

带缓冲的通道

RXJava中,当观察者快速发送数据流,被观察者处理不过来的时候有背压的概念,我们协程的通道也有缓存的api。当缓冲达到最大容量以后,该协程将被挂起,直至协程被关闭。示例如下:

  @Test
    fun addition_isCorrect()  = runBlocking{

        val channel = Channel<Int>(4) // 启动带缓冲的通道
        val sender = launch { // 启动发送者协程
            repeat(10) {
                println("Sending $it") // 在每一个元素发送前打印它们
                channel.send(it) // 将在缓冲区被占满时挂起
            }
        }
        // 没有接收到东西……只是等待……
        delay(1000)
        sender.cancel() // 取消发送者协程

    }
复制代码

执行结果:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
复制代码

结果分析:前四个元素被加入到了缓冲区并且发送者在试图发送第五个元素的时候被挂起。

通道是公平的

发送和接收操作是公平的并且尊重调用它们的多个协程。它们遵守先进先出原则,可以看到第一个协程调用receive 并得到了元素。

看示例:

data class Ball(var hits: Int)
   @Test
    fun addition_isCorrect()  = runBlocking{

        val table = Channel<Ball>() // 一个共享的 table(桌子)
        launch { player("乒", table) }
        launch { player("乓", table) }
        table.send(Ball(0)) // 乒乓球
        delay(1000) // 延迟 1 秒钟
        coroutineContext.cancelChildren() // 游戏结束,取消它们

    }


    suspend fun player(name: String, table: Channel<Ball>) {
        for (ball in table) { // 在循环中接收球
            ball.hits++
            println("$name $ball")
            delay(300) // 等待一段时间
            table.send(ball) // 将球发送回去
        }
    }
复制代码

执行结果:

乒 Ball(hits=1)
乓 Ball(hits=2)
乒 Ball(hits=3)
乓 Ball(hits=4)
复制代码

结果分析:

在上面的例子中两个协程“乒”和“乓”都从共享的“桌子”通道接收到这个“球”元素。“乒”协程首先被启动,所以它首先接收到了球。甚至虽然“乒” 协程在将球发送会桌子以后立即开始接收,但是球还是被“乓” 协程接收了,因为它一直在等待着接收球。

注意: 有时候通道执行时由于执行者的性质而看起来不那么公平。不公平示例

计时器通道

计时器通道是一种特别的会合通道,每次经过特定的延迟都会从该通道进行消费并产生Unit。 虽然它看起来似乎没用,它被用来构建分段来创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理。 可以在 select 中使用计时器通道来进行“打勾”操作。

说实话,从示例代码来看,我没有想到计时器通道在项目中的应用情景,但是不妨碍我知道协程有这么个东西,看看示例:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
        var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Initial element is available immediately: $nextElement") // 初始尚未经过的延迟

        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 所有随后到来的元素都经过了 100 毫秒的延迟
        println("Next element is not ready in 50 ms: $nextElement")

        nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
        println("Next element is ready in 100 ms: $nextElement")

        // 模拟大量消费延迟
        println("Consumer pauses for 150ms")
        delay(150)
        // 下一个元素立即可用
        nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Next element is available immediately after large consumer delay: $nextElement")
        // 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
        nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
        println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

        tickerChannel.cancel() // 表明不再需要更多的元素

    }
复制代码

执行结果:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 60 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
复制代码

组合挂起函数

默认顺序调用

这里指的是,当有多个协程的时候,协程会一个一个的按默认的代码顺序来执行,我们可以根据第一个协程的结果来决定是否要启动第二个协程,示例也比较简单,就是先假设去获取用户信息,然后获取我的消息,并把我的消息和我的信息进行加工:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val time = measureTimeMillis {
            val one = getUsefulInfo()
            val two = getMyMessage()
            println("The answer is ${one + two}")
        }
        println("Completed in $time ms")

    }


    suspend fun getUsefulInfo(): Int {
        delay(1000L) // 假设我们在这里做了些有用的事
        return 13
    }

    suspend fun getMyMessage(): Int {
        delay(1000L) // 假设我们在这里也做了一些有用的事
        return 29
    }
复制代码

执行结果:

The answer is 42
Completed in 2012 ms
复制代码

这里的measureTimeMillis指的是测量当前协程运行的时间:Executes the given [block] and returns elapsed time in milliseconds.

使用 async 并发

其实上面的代码是物理中串联的方式依次执行,我们可以继续优化,使其并联——并发执行

在概念上,async 就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于 launch 返回一个 Job 并且不附带任何结果值,而 async 返回一个 Deferred —— 一个轻量级的非阻塞 future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await() 在一个延期的值上得到它的最终结果, 但是 Deferred 也是一个 Job,所以如果需要的话,你可以取消它。

如果你知道dart的异步async/await,此处就会很熟悉,我们直接看代码:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val time = measureTimeMillis {
            val one = async { getUsefulInfo() }
            val two = async { getMyMessage() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")

    }


    suspend fun getUsefulInfo(): Int {
        delay(1000L) // 假设我们在这里做了些有用的事
        return 13
    }

    suspend fun getMyMessage(): Int {
        delay(1000L) // 假设我们在这里也做了一些有用的事
        return 29
    }
复制代码

执行结果:

The answer is 42
Completed in 1034 ms
复制代码

并行结果是协程的效率提升了,由耗时由2012毫秒提升到1034毫秒了

惰性启动的 async

惰性启动并发协程,意思是当我们并发启动协程的时候,又想控制协程启动的逻辑,这个时候可以使用惰性启动 async 示例代码:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { getUsefulInfo() }
            val two = async (start = CoroutineStart.LAZY){getMyMessage() }
            // 执行一些计算
            one.start() // 启动第一个
            two.start() // 启动第二个
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")

    }


    suspend fun getUsefulInfo(): Int {
        delay(1000L) // 假设我们在这里做了些有用的事
        return 13
    }

    suspend fun getMyMessage(): Int {
        delay(1000L) // 假设我们在这里也做了一些有用的事
        return 29
    }
复制代码

运行结果:

The answer is 42
Completed in 1039 ms
复制代码

这里好像和我们在dart中使用async不一样,那么我们不start直接await会怎么样呢?

   @Test
    fun addition_isCorrect()  = runBlocking{

        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { getUsefulInfo() }
            val two = async (start = CoroutineStart.LAZY){getMyMessage() }
            // 执行一些计算
//            one.start() // 启动第一个
//            two.start() // 启动第二个
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")

    }
复制代码

运行结果:

The answer is 42
Completed in 2040 ms
复制代码

这个结果肯定和我们预期不一样,那么问题出在哪里了呢?

注意,如果我们只是在println中调用wait,而没有首先调用单个协程的start,这将导致顺序行为,因为wait启动了协程执行并等待它的结束,这不是惰性的预期用例。异步用例(start = CoroutineStart.LAZY)在值的计算涉及挂起函数的情况下替代了标准的lazy函数。

Note that if we just call await in println without first calling start on individual coroutines, this will lead to sequential behavior, since await starts the coroutine execution and waits for its finish, which is not the intended use-case for laziness. The use-case for async(start = CoroutineStart.LAZY) is a replacement for the standard lazy function in cases when computation of the value involves suspending functions.

async 风格的函数

上面的代码我们还可以根据api的支持,进一步优化,换一种风格,示例如下:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val time = measureTimeMillis {
            // 我们可以在协程外面启动异步执行
            val one = somethingUsefulOneAsync()
            val two = somethingUsefulTwoAsync()
            // 但是等待结果必须调用其它的挂起或者阻塞
            // 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程
            runBlocking {
                println("The answer is ${one.await() + two.await()}")
            }
        }
        println("Completed in $time ms")

    }
    fun somethingUsefulOneAsync() = GlobalScope.async {
        getUsefulInfo()
    }

    fun somethingUsefulTwoAsync() = GlobalScope.async {
        getMyMessage()
    }

    suspend fun getUsefulInfo(): Int {
        delay(1000L) // 假设我们在这里做了些有用的事
        return 13
    }

    suspend fun getMyMessage(): Int {
        delay(1000L) // 假设我们在这里也做了一些有用的事
        return 29
    }
复制代码

运行结果:

The answer is 42
Completed in 1031 ms
复制代码

使用 async 的结构化并发

由于 async 被定义为了 CoroutineScope 上的扩展,我们需要将它写在作用域内,并且这是 coroutineScope 函数所提供的。这种情况下,如果在 concurrentSum 函数内部发生了错误,并且它抛出了一个异常, 所有在作用域中启动的协程都会被取消。 因此修改后的示例如下:

// 如果在 `concurrentSum` 函数内部发生了错误,只有发生错误的协程被关闭,其它协程一样可以继续执行
    @Test
    fun addition_isCorrect()  = runBlocking{

        runBlocking<Unit> {
            try {
                failedConcurrentSum()
            } catch(e: ArithmeticException) {
                println("Computation failed with ArithmeticException")
            }
        }
    }
    suspend fun failedConcurrentSum(): Int = coroutineScope {
        val one = async<Int> {
            try {
                delay(Long.MAX_VALUE) // 模拟一个长时间的运算
                42
            } finally {
                println("First child was cancelled")
            }
        }
        val two = async<Int> {
            println("Second child throws an exception")
            throw ArithmeticException()
        }
        one.await() + two.await()
    }
复制代码

运行结果:

Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException
复制代码

协程上下文与调度器

目录:

  • 调度器与线程
  • 非受限调度器 vs 受限调度器
  • 调试协程与线程
  • 在不同线程间跳转
  • 上下文中的作业
  • 子协程
  • 父协程的职责
  • 命名协程以用于调试
  • 组合上下文中的元素
  • 协程作用域
  • 线程局部数据

本节有11个知识点,感觉应该是很重要的一章,咱们一个一个来看看吧!

1.调度器与线程

协调程序上下文包括一个协调程序调度器(请参阅协调程序调度器),它确定相应的协调程序在执行时使用哪个或多个线程。协调程序调度程序可以将协调程序的执行限制在特定的线程中,将其分派到线程池中,或者让它无限制地运行。 The coroutine context includes a coroutine dispatcher (see CoroutineDispatcher) that determines what thread or threads the corresponding coroutine uses for its execution. The coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.

如果看不懂的话,我们就来看看具体的代码示例吧!

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        runBlocking {
            launch { // 运行在父协程的上下文中,即 runBlocking 主协程
                println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
            }
            launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
                println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
            }
            launch(Dispatchers.Default) { // 将会获取默认调度器
                println("Default               : I'm working in thread ${Thread.currentThread().name}")
            }
            launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
                println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
            }
        }
    }
复制代码

这个代码在测试里面用不了,只能在Activity里面运行了,不过我们最后关注的是运行结果:

Unconfined            : I'm working in thread main  // 当前协程的线程
main runBlocking      : I'm working in thread main  // 特殊调度器,后文讲
Default               : I'm working in thread DefaultDispatcher-worker-2    // 获取线程池默认线程
newSingleThreadContext: I'm working in thread MyOwnThread   // 开启专用工作线程
复制代码

通过上面的示例和运行结果,我们能看到调度器与线程就是线程切换的精髓,一般情况下我们可以将线程的调度安排直接交给协程,如果有特殊需要,也可以指定。

非受限调度器 vs 受限调度器

调度程序。无约束协同程序调度程序在调用方线程中启动一个协同程序,但只在第一个挂起点之前启动。挂起之后,它将恢复线程中的协程,该协程完全由调用的挂起函数决定。unrestricted dispatcher适用于既不消耗CPU时间也不更新任何共享数据(比如UI)的协程。

另一方面,在默认情况下,dispatcher是从外部CoroutineScope继承的。特别是,runBlocking协同程序的默认调度程序被限制在调用程序线程中,因此继承它的效果是将执行限制在这个线程中,使用可预测的FIFO调度。

有点绕,我们先看看示例代码:

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        runBlocking {
            launch(Dispatchers.Unconfined) { // 非受限的——将和主线程一起工作
                println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
                delay(500)
                println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
            }
            launch { // 父协程的上下文,主 runBlocking 协程
                println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
                delay(1000)
                println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
            }
        }
    }
复制代码

运行结果:

I/System.out: Unconfined      : I'm working in thread main
I/System.out: main runBlocking: I'm working in thread main
I/System.out: Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
I/System.out: main runBlocking: After delay in thread main

复制代码

协程与上下文继承自runBlocking{…}继续在主线程中执行,而unrestricted则继续在延迟函数使用的默认执行器线程中执行。

注意: 非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该被用在通常的代码中。

调试协程与线程

协程可以在一个线程上挂起并在其它线程上恢复。 甚至一个单线程的调度器也是难以弄清楚协程在何时何地正在做什么事情。使用通常调试应用程序的方法是让线程在每一个日志文件的日志声明中打印线程的名字。这种特性在日志框架中是普遍受支持的。但是在使用协程时,单独的线程名称不会给出很多协程上下文信息。 示例代码:

    @Test
    fun addition_isCorrect()  = runBlocking{

        val a = async {
            log("I'm computing a piece of the answer")
            6
        }
        val b = async {
            log("I'm computing another piece of the answer")
            7
        }
        log("The answer is ${a.await() * b.await()}")
    }

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
复制代码

运行结果:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
复制代码

log函数在方括号中打印线程的名称,您可以看到它是主线程,后面附加了当前正在执行的协程的标识符。当调试模式打开时,这个标识符被连续分配给所有创建的协程。 The log function prints the name of the thread in square brackets, and you can see that it is the main thread with the identifier of the currently executing coroutine appended to it. This identifier is consecutively assigned to all created coroutines when the debugging mode is on.

关于调试这一点我是理解的,因为RxJava在线程切换的过程中也是一样不容易调试的。

在不同线程间跳转

它演示了一些新技术。其中一个使用 runBlocking 来显式指定了一个上下文,并且另一个使用 withContext 函数来改变协程的上下文,而仍然驻留在相同的协程中,

    @Test
    fun addition_isCorrect(){

        newSingleThreadContext("Ctx1").use { ctx1 ->
            newSingleThreadContext("Ctx2").use { ctx2 ->
                runBlocking(ctx1) {
                    log("Started in ctx1")
                    withContext(ctx2) {
                        log("Working in ctx2")
                    }
                    log("Back to ctx1")
                }
            }
        }
    }

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
复制代码

运行结果:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
复制代码

上下文中的作业

协同程序的Job是其上下文的一部分,可以使用coroutineContext[Job]表达式从中检索:

    @Test
    fun addition_isCorrect()  = runBlocking{

        println("My job is ${coroutineContext[Job]}")
    }
复制代码

调试模式输出结果如下:

My job is "coroutine#1":BlockingCoroutine{Active}@3aa9e816
复制代码

注意CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的一种方便的快捷方式。

子协程

当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过 CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程作业的 子 作业。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。

    @Test
    fun addition_isCorrect()  = runBlocking{

        // 启动一个协程来处理某种传入请求(request)
        val request = launch {
            // 孵化了两个子作业, 其中一个通过 GlobalScope 启动
            GlobalScope.launch {
                println("job1: I run in GlobalScope and execute independently!")
                delay(1000)
                println("job1: I am not affected by cancellation of the request")
            }
            // 另一个则承袭了父协程的上下文
            launch {
                delay(100)
                println("job2: I am a child of the request coroutine")
                delay(1000)
                println("job2: I will not execute this line if my parent request is cancelled")
            }
        }
        delay(500)
        request.cancel() // 取消请求(request)的执行
        delay(1000) // 延迟一秒钟来看看发生了什么
        println("main: Who has survived request cancellation?")
    }
复制代码

运行结果:

job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
复制代码

父协程的职责

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动,并且不必使用 Job.join 在最后的时候等待它们。这个很好理解,我们看看示例

       @Test
    fun addition_isCorrect()  = runBlocking{

        // 启动一个协程来处理某种传入请求(request)
        val request = launch {
            repeat(3) { i -> // 启动少量的子作业
                launch  {
                    delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                    println("Coroutine $i is done")
                }
            }
            println("request: I'm done and I don't explicitly join my children that are still active")
        }
        request.join() // 等待请求的完成,包括其所有子协程
        println("Now processing of the request is complete")
    }
复制代码

执行结果:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
复制代码

命名协程以用于调试

自动分配的id很好,当协同程序经常记录日志时,您只需要关联来自相同协同程序的日志记录。然而,当协程被绑定到特定请求的处理或执行特定的后台任务时,为了调试的目的,最好显式地为它命名。CoroutineName上下文元素的作用与线程名相同。当调试模式打开时,它包含在执行这个协程的线程名中。 Automatically assigned ids are good when coroutines log often and you just need to correlate log records coming from the same coroutine. However, when a coroutine is tied to the processing of a specific request or doing some specific background task, it is better to name it explicitly for debugging purposes. The CoroutineName context element serves the same purpose as the thread name. It is included in the thread name that is executing this coroutine when the debugging mode is turned on.

意思是协程和线程一样也有自己的ID,但是我们也可以显示的对它进行命名,如示例:

 // 启动一个协程来处理某种传入请求(request)
        val request = launch {
            repeat(3) { i -> // 启动少量的子作业
                launch  {
                    delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                    println("Coroutine $i is done")
                }
            }
            println("request: I'm done and I don't explicitly join my children that are still active")
        }
        request.join() // 等待请求的完成,包括其所有子协程
        println("Now processing of the request is complete")
复制代码

运行结果:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
复制代码

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        runBlocking {
            launch(Dispatchers.Default + CoroutineName("test")) {
                println("I'm working in thread ${Thread.currentThread().name}")
            }
        }
    }
复制代码

运行结果:

System.out: I'm working in thread DefaultDispatcher-worker-2
复制代码

协程作用域

让我们把我们对上下文、子协程和业务放在一起。假设我们的应用程序有一个具有生命周期的对象,但是该对象不是协程。例如,我们正在编写一个Android应用程序,并在Android活动的上下文中启动各种协作程序,以执行异步操作来获取和更新数据、执行动画等。当销毁活动时,必须取消所有这些协程,以避免内存泄漏。当然,我们可以手动操作上下文和作业来绑定活动及其协程的生命周期,但是kotlinx。协程提供了一个抽象,它封装了:CoroutineScope。您应该已经熟悉了协程作用域,因为所有协程构建器都声明为它的扩展。

Let us put our knowledge about contexts, children and jobs together. Assume that our application has an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch and update data, do animations, etc. All of these coroutines must be cancelled when the activity is destroyed to avoid memory leaks. We, of course, can manipulate contexts and jobs manually to tie the lifecycles of the activity and its coroutines, but kotlinx.coroutines provides an abstraction encapsulating that: CoroutineScope. You should be already familiar with the coroutine scope as all coroutine builders are declared as extensions on it.

我们通过创建一个 CoroutineScope 实例来管理协程的生命周期,并使它与 activit 的生命周期相关联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() 工厂函数。前者创建了一个通用作用域,而后者为使用 Dispatchers.Main 作为默认调度器的 UI 应用程序 创建作用域

package com.vincent.coroutinesapplication

import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import kotlinx.coroutines.*

class MainActivity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun destroy() {
        cancel() // Extension on CoroutineScope
    }
    // 继续运行……

    // class Activity continues
    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束


fun main() = runBlocking<Unit> {
    val activity = MainActivity()
    activity.doSomething() // 运行测试函数
    println("Launched coroutines")
    delay(500L) // 延迟半秒钟
    println("Destroying activity!")
    activity.destroy() // 取消所有的协程
    delay(1000) // 为了在视觉上确认它们没有工作
}
复制代码

运行结果:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!
复制代码

这一节的内容和父协程的章节内容差不多意思,只是从不同的角度来讲解的。

线程局部数据

我们使用java编码的时候,对于每个线程的局部变量可以使用ThreadLocal来实现,而协程也有相关的概念。

有时,能够将一些线程本地数据传递到协程或协程之间是很方便的。但是,由于它们没有绑定到任何特定的线程,如果手工执行,这可能会导致数据丢失。

Sometimes it is convenient to have an ability to pass some thread-local data to or between coroutines. However, since they are not bound to any particular thread, this will likely lead to boilerplate if done manually.

ThreadLocalasContextElement 扩展函数在这里会充当救兵。它创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。

val threadLocal = ThreadLocal<String?>() // 声明线程局部变量
fun main() = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
复制代码

运行结果:

Pre-main, current thread: Thread[main,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main,5,main], thread local value: 'main'
复制代码

很容易忘记设置相应的上下文元素。如果运行协程的线程不同,那么从协程访问的线程本地变量可能会有一个意外值。为了避免这种情况,建议使用ensurepresentation方法和故障快速对不正确的用法。

ThreadLocal具有一流的支持,可以与任何基本的kotlinx一起使用。协同程序提供。但是,它有一个关键的限制:当线程本地发生变化时,一个新值不会传播到coroutine调用者(因为上下文元素不能跟踪所有ThreadLocal对象访问),并且更新后的值在下一个暂停中丢失。使用withContext更新协同程序中的线程本地值,请参阅asContextElement了解更多细节。

异常处理

这部分内容包括异常处理以及取消异常。

异常的传播

我们在使用编码中遇到异常一般是可以就地try/catch或者使用注解@Throws向调用方法的时候去处理,后者就是一种异常的传播。但是在协程中,根据协程构建器的不同,也分为两种情况:

自动的传播异常(launch 以及 actor) 或者将它们暴露给用户(async 以及 produce)。 前者对待异常是不处理的,类似于 Java 的 Thread.uncaughtExceptionHandler, 而后者依赖用户来最终消耗异常,比如说,通过 await 或 receive (produce 以及 receive 在通道中介绍过) 示例:

fun main() = runBlocking<Unit> {
    val job = GlobalScope.launch {
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}
复制代码

运行结果:

Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-1" java.lang.IndexOutOfBoundsException
	at com.vincent.coroutinesapplication.MainActivityKt$main$1$job$1.invokeSuspend(MainActivity.kt:30)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
Joined failed job
Throwing exception from async
Caught ArithmeticException
复制代码

因为是异步,所以就算前面已经运行出错了,但是还是没有影响到后面协程运行。这一点和我们平时看到的代码有些不一样。

CoroutineExceptionHandler

在 JVM 中可以重定义一个全局的异常处理者来将所有的协程通过 ServiceLoader 注册到 CoroutineExceptionHandler。 全局异常处理者就如同 Thread.defaultUncaughtExceptionHandler 一样,在没有更多的指定的异常处理者被注册的时候被使用。 在 Android 中, uncaughtExceptionPreHandler 被设置在全局协程异常处理者中。 他的主要作用是将异常统一处理,比如保存到SD

示例:

fun main() = runBlocking<Unit> {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    val job = GlobalScope.launch(handler) {
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) {
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 deferred.await()
    }
    joinAll(job, deferred)
}
复制代码

运行结果:

Caught java.lang.AssertionError
复制代码

取消与异常

协程内部使用 CancellationException来进行取消,但是这个异常会被所有的处理者忽略,所以取消和异常是紧密相关的。 我们看看示例:

fun main() = runBlocking<Unit> {
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("Child is cancelled")
            }
        }
        yield()
        println("Cancelling child")
        child.cancel()
        child.join()
        yield()
        println("Parent is not cancelled")
    }
    job.join()
}
复制代码

运行结果:

Cancelling child
Child is cancelled
Parent is not cancelled
复制代码

再看一个示例:

fun main() = runBlocking<Unit> {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    val job = GlobalScope.launch(handler) {
        launch { // 第一个子协程
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // 第二个子协程
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
}
复制代码

运行结果:

Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
Caught java.lang.ArithmeticException
复制代码

对比两个运行结果,得到的结论是:

  1. 当一个协程在没有任何理由的情况下使用 Job.cancel取消的时候,它会被终止,但是它不会取消它的父协程。 无理由取消是父协程取消其子协程而非取消其自身的机制。

  2. 如果协程遇到除CancellationException以外的异常,它将取消具有该异常的父协程,并且这种异常是无法被CoroutineExceptionHandler回收的

异常聚合

上面说到如果协程遇到除CancellationException以外的异常,它将取消具有该异常的父协程,那如果父协程有多个子协程应该怎么办呢?

通常的规则是“第一个异常赢得了胜利”,所以第一个被抛出的异常将会暴露给处理者。 但也许这会是异常丢失的原因,比如说一个协程在 finally块中抛出了一个异常。 这时,多余的异常将会被压制。

但是在JDK7还有其它处理办法,示例:

fun main() = runBlocking<Unit> {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                throw ArithmeticException()
            }
        }
        launch {
            delay(100)
            throw IOException()
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}
复制代码

运行结果:

Caught java.io.IOException with suppressed [java.lang.ArithmeticException]
复制代码

监督

取消是一种双向机制,在协程的整个层次结构之间传播。但是如果需要单向取消怎么办?

  • 监督作业

先看示例:

fun main() = runBlocking<Unit> {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("First child is failing")
            throw AssertionError("First child is cancelled")
        }
        // 启动第两个子作业
        val secondChild = launch {
            firstChild.join()
            // 取消了第一个子作业且没有传播给第二个子作业
            println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // 但是取消了监督的传播
                println("Second child is cancelled because supervisor is cancelled")
            }
        }
        // 等待直到第一个子作业失败且执行完成
        firstChild.join()
        println("Cancelling supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}
复制代码

运行结果:

First child is failing
First child is cancelled: true, but second one is still active
Cancelling supervisor
Second child is cancelled because supervisor is cancelled
复制代码

结果分析:用书面术语就是SupervisorJob的取消只会向下传播,没有影响到父协程。白话就是如果协程遇到除CancellationException以外的异常,它取消具有该异常的父协程是可以避免的,方法就是使用SupervisorJob替代常规的 Job

  • 监督作用域 此处就是将上面的这种效果划定协程范围,方法就是使用supervisorScope替代coroutineScope。看示例代码:
fun main() = runBlocking<Unit> {
    try {
        supervisorScope {
            val child = launch {
                try {
                    println("Child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    println("Child is cancelled")
                }
            }
            // 使用 yield 来给我们的子作业一个机会来执行打印
            yield()
            println("Throwing exception from scope")
            throw AssertionError()
        }
    } catch(e: AssertionError) {
        println("Caught assertion error")
    }
}
复制代码

运行结果:

Child is sleeping
Throwing exception from scope
Child is cancelled
Caught assertion error
复制代码
  • 监督协程中的异常

常规的作业和监督作业之间的另一个重要区别是异常处理。 每一个子作业应该通过异常处理机制处理自身的异常。 这种差异来自于子作业的执行失败不会传播给它的父作业的事实。

这里其实就是对监督的异常再次做了异常强调,如果对监督这个概念觉得抽象,我们就再看看上面的这句话:取消是一种双向机制,在协程的整个层次结构之间传播。这里的监督指的就是协程取消的单向传递,不明白的再看看最后一个示例:

fun main() = runBlocking<Unit> {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    supervisorScope {
        val child = launch(handler) {
            println("Child throws an exception")
            throw AssertionError()
        }
        println("Scope is completing")
    }
    println("Scope is completed")
}
复制代码

运行结果:

Scope is completing
Child throws an exception
Caught java.lang.AssertionError
Scope is completed
复制代码

如果到这里还是不明白,那你就把SupervisorJob换成常规的Job,并把coroutineScope换成supervisorScope看看运行结果就明白是怎么回事了!

select 表达式 (实验性的)

select表达式可以同时等待多个挂起函数,并 选择 第一个可用的。 注意:

Select表达式在kotlinx.coroutines中是一个实验性的特性。这些APIkotlinx.coroutines库即将到来的更新中可能会发生改变。 表现形式:

fun CoroutineScope.fizz() = produce<String> {
    while (true) { // 每 300 毫秒发送一个 "Fizz"
        delay(300)
        send("Fizz")
    }
}

fun CoroutineScope.buzz() = produce<String> {
    while (true) { // 每 500 毫秒发送一个 "Buzz!"
        delay(500)
        send("Buzz!")
    }
}

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> 意味着该 select 表达式不返回任何结果
     
    }
}
复制代码

在通道中 select

使用 receive挂起函数,我们可以从两个通道接收 其中一个 的数据。 但是select表达式允许我们使用其 onReceive子句 同时 从两者接收:

通道关闭时 select

select中的onReceive子句在已经关闭的通道执行会发生失败,并导致相应的 select 抛出异常。我们可以使用 onReceiveOrNull子句在关闭通道时执行特定操作。

现在我们来看看select通道示例:

fun main() = runBlocking<Unit> {
    val fizz = fizz()
    val buzz = buzz()
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
    coroutineContext.cancelChildren() // 取消 fizz 和 buzz 协程 
}

fun CoroutineScope.fizz() = produce<String> {
    while (true) { // 每 300 毫秒发送一个 "Fizz"
        delay(300)
        send("Fizz")
    }
}

fun CoroutineScope.buzz() = produce<String> {
    while (true) { // 每 500 毫秒发送一个 "Buzz!"
        delay(500)
        send("Buzz!")
    }
}

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> 意味着该 select 表达式不返回任何结果
        fizz.onReceive { value ->  // 这是第一个 select 子句
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // 这是第二个 select 子句
            println("buzz -> '$value'")
        }
    }
}
复制代码

运行结果:

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
复制代码

从运行结果可以看到, select 表达式允许我们使用onReceive 子句,同时从两者接收数据。select中的onReceive 子句在已经关闭的通道执行会发生失败,并导致相应的 select抛出异常。我们可以使用 onReceiveOrNull 子句在关闭通道时执行特定操作。


复制代码

结果分析:

首先,select 偏向于 第一个子句,当可以同时选到多个子句时, 第一个子句将被选中。在这里,两个通道都在不断地生成字符串,因此 a 通道作为 select 中的第一个子句获胜。然而因为我们使用的是无缓冲通道,所以 a 在其调用 send 时会不时地被挂起,进而 b 也有机会发送。

第二个观察结果是,当通道已经关闭时, 会立即选择onReceiveOrNull

但是注意:

onReceiveOrNull是一个扩展函数,仅为具有非空元素的通道定义,因此在封闭通道和空值之间不会发生意外混淆。

Note that onReceiveOrNull is an extension function defined only for channels with non-nullable elements so that there is no accidental confusion between a closed channel and a null value.

Select 以发送

当主通道上的消费者无法跟上它时,它会将值发送到 side(非主通道) 上 看不懂的话,我们看看一个示例:

fun main() = runBlocking<Unit> {
    val side = Channel<Int>() // 分配 side 通道
    launch { // 对于 side 通道来说,这是一个很快的消费者
        side.consumeEach { println("Side channel has $it") }
    }
    produceNumbers(side).consumeEach {
        println("Consuming $it")
        delay(250) // 不要着急,让我们正确消化消耗被发送来的数字
    }
    println("Done consuming")
    coroutineContext.cancelChildren()
}

fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
    for (num in 1..10) { // 生产从 1 到 10 的 10 个数值
        delay(100) // 延迟 100 毫秒
        select<Unit> {
            onSend(num) {} // 发送到主通道
            side.onSend(num) {} // 或者发送到 side 通道
        }
    }
}
复制代码

运行结果:

Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
复制代码

Select 延迟值

金句:延迟值可以使用 onAwait 子句查询。 看看示例就明白了:

fun main() = runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'"
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    println("$countActive coroutines are still active")
}

fun CoroutineScope.asyncString(time: Int) = async {
    delay(time.toLong())
    "Waited for $time ms"
}

fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}
复制代码

运行结果:

Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
复制代码

结果分析: 就是我通过异步,生成了一个12个字符串的集合,然后输出了第一个字符串以后统计仍处于激活状态的延迟值的数量。代码看起来很抽象,其实说起来很简单,不懂的话再看看示例吧!

在延迟值通道上切换

示例:

fun main() = runBlocking<Unit> {
    val chan = Channel<Deferred<String>>() // 测试使用的通道
    launch { // 启动打印协程
        for (s in switchMapDeferreds(chan))
            println(s) // 打印每个获得的字符串
    }
    chan.send(asyncString("BEGIN", 100))
    delay(200) // 充足的时间来生产 "BEGIN"
    chan.send(asyncString("Slow", 500))
    delay(100) // 不充足的时间来生产 "Slow"
    chan.send(asyncString("Replace", 100))
    delay(500) // 在最后一个前给它一点时间
    chan.send(asyncString("END", 500))
    delay(1000) // 给执行一段时间
    chan.close() // 关闭通道……
    delay(500) // 然后等待一段时间来让它结束
}

fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
    var current = input.receive() // 从第一个接收到的延迟值开始
    while (isActive) { // 循环直到被取消或关闭
        val next = select<Deferred<String>?> { // 从这个 select 中返回下一个延迟值或 null
            input.onReceiveOrNull { update ->
                update // 替换下一个要等待的值
            }
            current.onAwait { value ->
                send(value) // 发送当前延迟生成的值
                input.receiveOrNull() // 然后使用从输入通道得到的下一个延迟值
            }
        }
        if (next == null) {
            println("Channel was closed")
            break // 跳出循环
        } else {
            current = next
        }
    }
}

fun CoroutineScope.asyncString(str: String, time: Long) = async {
    delay(time)
    str
}
复制代码

运行结果:

BEGIN
Replace
END
Channel was closed
复制代码

这一节仔细看了半天,没有理解到延迟通道上的切换指的是异步返回的值切换还是什么?从运行结果来看,第二个字符串被第三个字符串替代了,但是没有明白编码的时候,这个点应该用在什么业务? 从介绍来看,也只能是这一句——我们现在来编写一个通道生产者函数,它消费一个产生延迟字符串的通道,并等待每个接收的延迟值,但它只在下一个延迟值到达或者通道关闭之前处于运行状态,但是对于编码的作用还有待研究。

共享的可变状态与并发

本节解决的问题是多线程中常见的难题:数据安全性,即多个线程共同访问一个变量时,这个变量的安全性将得不到保证,比如下面的示例,当并发期间编辑counter将会被多个线程同时访问,导致最后的结果与期望不一致:

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}
@Volatile
var counter = 0
复制代码

运行结果:

Completed 100000 actions in 34 ms
Counter = 87104
复制代码

我们最后期望的结果应该是100000,但最终差距很远,那么如何保证线程安全呢? 下面就是几种解决方案。

线程安全的数据结构

类似我们使用StringBufferStringBuilder,在多线程的时候,我们也需要使用线程安全的数据类型,比如AtomicInteger,我们看一下示例:

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counter = AtomicInteger()
复制代码

运行结果:

Completed 100000 actions in 78 ms
Counter = 100000
复制代码

但是注意

这是针对此类特定问题的最快解决方案。它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作。

以细粒度限制线程

限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。

用白话来解释就是,给业务添加了一个"Synchronize",使之线程访问被单独添加了一个单例的线程,已达到“锁住”的效果。 来看看具体示例:

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 将每次自增限制在单线程上下文中
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
复制代码

运行结果:

Completed 100000 actions in 414 ms
Counter = 100000
复制代码

注意:细粒度线程控制同Synchronize一样影响性能,文档是这样解释其中的原因:这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制。每个增量操作都得使用 [withContext(counterContext)] 块从多线程 Dispatchers.Default 上下文切换到单线程上下文。

以粗粒度限制线程

此处相当于是将锁方法内代码块改为直接锁方法,这个熟悉Synchronize朋友应该熟悉,不信的话我们继续看看示例:

fun main() = runBlocking<Unit> {
    // 将一切都限制在单线程上下文中
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
复制代码

运行结果:

Completed 100000 actions in 25 ms
Counter = 100000
复制代码

互斥

所谓互斥,类似于Java中的wait/notify,只是协程统一封装在Mutex,具体的看看示例:

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 用锁保护每次自增
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同一动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

val mutex = Mutex()
var counter = 0
复制代码

运行结果:

Completed 100000 actions in 297 ms
Counter = 100000
复制代码

最后文档还是提了一下关于性能的提醒:

因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。

Actors

简单一句话解释:一个 actor 是由协程、 被限制并封装到该协程中的状态以及一个与其它协程通信的通道组合而成的一个实体,看不懂的话,可以理解为这个是多线程封装最好的一个对象,那它如何使用呢? 我们来看看示例:

fun main() = runBlocking<Unit> {
    val counter = counterActor() // 创建该 actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // 发送一条消息以用来从一个 actor 中获取计数值
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // 关闭该actor
}

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 启动的协程数量
    val k = 1000 // 每个协程重复执行同个动作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程的作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求

// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
复制代码

运行结果:

Completed 100000 actions in 311 ms
Counter = 100000
复制代码

ActorsMutex属于协程专有的一些特性,这些特性只有在项目中实际运用才能深刻理解其特性的具体效果和优势

三、总结

总算按时完成了之前立下的falg!这七天仔细把文档过了一遍,认真看了其中的知识点,但是并没有深刻理解协程是什么,只是有一个懵懵懂懂的概念。纸上得来终觉浅,绝知此事要躬行,接下来准备继续研究一下这方面的demo,然后再回头看看码上开学关于协程的讲解,这就是我的一个大概学习计划了!


由于上面的内容很多东西都是自己的理解,只是一个参考答案,如果各位有不同意见,欢迎点评,我们一起进步!

官网协程文档

关注下面的标签,发现更多相似文章
评论