kotlin-coroutines 基础概念

5,024 阅读9分钟

Why

  • 简化异步代码的编写。
  • 执行严格主线程安全确保你的代码永远不会意外阻塞主线程,并增强了代码的可读性。
  • 提升代码安全性,内存泄漏更少。
  • 协程间通信。

What

协程的概念在编程语言的早期就出现了,在1967年Simula第一次使用协程。 协程就像非常轻量级的线程。 线程是由系统调度的,线程切换或线程阻塞的开销都比较大。而协程依赖于线程,但是协程挂起时不需要阻塞线程,几乎是无代价的,协程是由开发者控制的。所以协程也像用户态的线程,非常轻量级,一个线程中可以创建任意个协程。

但是kotlin的协程并不是真正的协程,只是对线程的再一次封装,让你更方便的像写同步代码一样编写异步代码。具体细节请参考:Kotlin 协程真的比 Java 线程更高效吗?

How

在Android开发中,经常遇到的问题:

  • Long running tasks
  • Main-safety
  • Leak work

Long running tasks

  • 一次CPU循环小于0.0000000004秒
  • 一次网络请求大约0.4秒

在Android中主线程主要用户UI的渲染和响应用户手势交互,以及轻量级的逻辑运算。若果在主线程发起一个请求,将会导致应用变慢、变卡、无法响应用户的交互,很容易造成ARN,用户体验极差。所以业界通行的做法是通过callback实现异步回调:

class ViewModel: ViewModel() {
   fun fetchDocs() {
       get("developer.android.com") { result ->
           show(result)
       }
    }
}

上面callback的示例只是一层回调的情况,假如有两个甚至更多的异步请求,而且存在下一个请求依赖上一个请求的结果,就会存在层层嵌套,当然目前比较流行的做法是用Retrofit的转换函数flatMap实现链式调用,但是代码看起来还是很臃肿。如果使用协程上面的代码可以简化成这样:

// Dispatchers.Main
suspend fun fetchDocs() {
    // Dispatchers.Main
    val result = get("developer.android.com")
    // Dispatchers.Main
    show(result)
}

suspend fun get(url: String) = withContext(Dispatchers.IO) {
    // Make a request
    // Dispatchers.IO
}

Coroutines提供一个很好途径可以简化耗时任务的代码编写,使得异步callback的代码可以像同步代码一样顺序编写。Coroutines在普通的方法上面加上两个新的操作。除了callreturn,Coroutines还增加 suspendresume。 协程使用栈帧管理当前运行的方法和方法的所有本地变量。当协程开始挂起,当前栈帧被复制并保存以供后续使用。当协程开始被恢复,栈帧将从它被保存的地方恢复回来,当前栈帧的方法继续执行。

suspend: 挂起函数

fun main() = runBlocking {
    val userId = mockRequest()
    println("userId: $userId in thread: ${Thread.currentThread().name}")
}

suspend fun mockRequest(): Int {
    println("mockRequest in thread: ${Thread.currentThread().name}")
    return getUserId()
}

suspend fun getUserId(): Int {
    return withContext(Dispatchers.IO) {
        println("getUserId in thread: ${Thread.currentThread().name}")
        Thread.sleep(10000)
        100
    }
}

Output:

mockRequest in thread: main
getUserId in thread: DefaultDispatcher-worker-1
userId: 100 in thread: main
  • suspend functions只能在协程或者suspend functions 中被调用。
  • 可以让函数体内的代码在指定的线程中(比如 withContext指定的io线程)运行,如果没有指定线程,那么它将在调用它的线程中运行。
  • 挂起函数之后的代码将被暂停执行,比如 println("userId: $userId in thread: ${Thread.currentThread().name}")
  • 当前线程会去执行其他的任务,如果是异步线程且没有任务可执行则释放到线程池。

挂起当前协程的执行,会将当前执行栈帧的所有本地变量和函数copy出来并保存。

resume

  • 当挂起函数执行完毕,自动切回到当前线程继续执行挂起函数之后的代码 println("userId: $userId in thread: ${Thread.currentThread().name}")
  • 一般情况下恢复执行时会自动切回当前线程,除非你的挂起函数使用的Dispatchers是Dispatchers.Unconfined。

Main-safety with coroutines

在Kotlin协程中,写的好的suspend functions 总是应该可以安全的从主线程被调用,也应该允许从任何线程被调用。使用suspend修饰的 function并不是告诉Kotlin这个方法在主线程运行。

为了写一个主线程安全的耗时方法,你可以让协程在Default 或者 IO dispatcher中执行(用withContext(Dispatchers.IO)指定在IO线程中运行)。在协程所有的协程必须运行在dispatcher中,即使他们运行在主线程中。Coroutines将会挂起自己,dispatcher知道如何恢复他们。

为了指定coroutines在什么线程运行,kotlin提供了四种Dispatchers:

Dispatchers用途使用场景
Dispatchers.Main主线程,和UI交互,执行轻量任务1.call suspend functions。2. call UI functions。 3. Update LiveData
Dispatchers.IO用于网络请求和文件访问1. Database。 2.Reading/writing files。3. Networking
Dispatchers.DefaultCPU密集型任务1. Sorting a list。 2.Parsing JSON。 3.DiffUtils
Dispatchers.Unconfined不限制任何制定线程高级调度器,不应该在常规代码里使用

假如你在Room中使用suspend functionsRxJavaLiveData,它自动提供了主线程安全。

// Dispatchers.Main
suspend fun fetchDocs() {
    // Dispatchers.Main
    val result = get("developer.android.com")
    // Dispatchers.Main
    show(result)
}
// Dispatchers.Main
suspend fun get(url: String) =
    // Dispatchers.IO
    withContext(Dispatchers.IO) {
        // Dispatchers.IO
        /* perform blocking network IO here */
    }
    // Dispatchers.Main

Leak work

你的程序里面可能会有成千上万个协程,你很难通过代码手动追踪它们,假如你通过代码手动追踪他们以确保它们完成或取消,那么代码会显得臃肿且很容易出错。如果代码不是很完美,可能会失去对coroutine的追踪,并导致任务泄露。任务泄露就像内存泄露,但是更严重。它不但会浪费内存的使用,还有cpu、磁盘,甚至会发起一个网络请求。

在android中,我们知道Activity和Fragment等都是有生命周期的,我们通常的模式是当前页面退出的时候,取消所有的异步任务。假如有一个异步的网络请求,在当前页面销毁的时候还在执行,会导致哪些问题:

  • 空指针异常。为请求结果回来之后去更新UI状态,而意外导致空指针异常。
  • 浪费内存资源。
  • 浪费CPU资源。

为了避免协程泄露,kotlin引入 结构化并发 。结构化并发是语言特性和最佳实践的组合,如果我们遵循最佳实践,将帮助追踪运行在协程中的任务。在Android中结构化并发可以帮我们做如下三件事:

  • 取消任务,当协程不再需要的时候。
  • 追踪任务,当协程运行的时候。
  • 传播错误信号,当协程执行失败的时候。

解决方式:

  • CoroutineScope 取消任务,其实是通过关联的job取消任务。
  • 任务追踪,coroutines的结构化并发通过coroutineScopesupervisorScope 保证 suspend function的所有任务完成才返回。
  • 传播错误信号coroutineScope 保证错误双向传递,只要有一个子coroutine失败或出现异常,异常往父域传递,并取消所有的子coroutines。而 supervisorScope 实现单向错误传递,适用于作业监控器。
CoroutineScope

在kotlin中所有的协程必须运行在CoroutineScope中,scope帮你追踪所有协程的状态,但是它不像Dispatcher,并不运行你的协程。它可以取消所有在里面启动的协程。启动一个新协程:

scope.launch {
    // This block starts a new coroutine 
    // "in" the scope.
    // 
    // It can call suspend functions
   fetchDocs()
}

创建CoroutineScope的常见方式如下:

  • CoroutineScope(context: CoroutineContext),api方法,如:val scope = CoroutineScope(Dispatchers.Main + Job()),或者如下:
class LifecycleCoroutineScope : CoroutineScope, Closeable {

    private val job = JobSupervisorJob()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job

    override fun close() {
        job.cancel()
    }
}
class SimpleRetrofitActivity : FragmentActivity() {
    private val activityScope = LifecycleCoroutineScope()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_simple_retrofit)

        // some other code ...
    }

    override fun onDestroy() {
        super.onDestroy()
        activityScope.close()
    }
    // some other code ...
}
  • coroutineScope:api方法,创建新一个子域,并管理域中的所有协程。注意这个方法只有在block中创建的所有子协程全部执行完毕后,才会退出。
  • supervisorScope:与 coroutineScope的区别是在子协程失败时,错误不会往上传递给父域,所以不会影响子协程。

创建协程的常见方式如下:

  • lauch:协程构建器,创建并启动(也可以延时启动)一个协程,返回一个Job,用于监督和取消任务,用于无返回值的场景。
  • async:协程构建器,和launch一样,区别是返回一个Job的子类 Deferred,唯一的区别是可以通过await获取完成时的返回值,或者捕获异常(异常处理也不一样)。

在Android中有一个kotlin的ViewModel的扩展库 lifecycle-viewmodel-ktx:2.1.0-alpha04,可以通过viewModelScope扩展属性启动协程,viewModelScope绑定了activity的生命周期,activity销毁的时候会自动取消在这个scope中启动的所有协程。

fun runForever() {
    // start a new coroutine in the ViewModel
    viewModelScope.launch {
        // cancelled when the ViewModel is cleared
        while(true) {
            delay(1_000)
            // do something every second
        }
    }
}

注意,协程的取消是协作的,当协程挂起的时候被取消将会抛一个 CancellationException,即使你捕获了这个异常,这个协程的状态也变为取消状态。假如你是一个计算协程,并且没有检查取消状态,那么这个协程不能被取消。

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
任务追踪

有时,我们希望两个或多个请求同时并发,并等待他们全部完成,suspend function 加上 coroutineScope 创建的子域可以保证全部子协程完成才返回。

suspend fun fetchTwoDocs() {
    coroutineScope {
        launch { fetchDoc(1) }
        async { fetchDoc(2) }
    }
}
传播错误信号

注意协程的结构化并发是基于语言特性加上最佳实践的,如下方式会导致,错误丢失:

val unrelatedScope = MainScope()
// example of a lost error
suspend fun lostError() {
    // async without structured concurrency
    unrelatedScope.async {
        throw InAsyncNoOneCanHearYou("except")
    }
}

上面代码丢失错误是因为 async的恢复需要调用await,这样才能将异常重新上传,而在suspend function 使用了另外一个协程域,导致lostError不会等待子作业的完成就退出了。正确的结构化并发:

suspend fun foundError() {
    coroutineScope {
        async { 
            throw StructuredConcurrencyWill("throw")
        }
    }
}

你可以通过CoroutineScope (注意是大写开头的C) 和 GlobalScope来创建 非结构化的协程,仅仅当你认为它的生命周期比调用者生命周期更长。

总结

  • CoroutineScope:协程作用域包含 CoroutineContext,用于启动协程,并追踪子协程,其实是通过Job追踪的。
    • 如果一个协程作用域被cancel了,那么它将无法在这个作用域内创建新的协程。
  • CoroutineContext:协程上下文,主要包含JobCoroutineDispatcher,表示一个协程的场景。
  • CoroutineDispatcher:协程调度器,决定协程所在的线程或线程池。它可以指定协程运行于特定的一个线程、一个线程池或者不指定任何线程。
  • Job:任务,封装了协程中需要执行的代码逻辑。Job 可以取消并且有简单生命周期,它有三种状态:isActiveisCompletedisCancelled
  • Deferred:Job的子类,有返回值的Job,通过await获取。
  • 协程构建器包括:lauchasync

示例

参考