阅读 1783

Kotlin 基础 | 为什么要这样用协程?

协程最大的好处是让异步代码更符合线性思维。脑袋里怎么想,直接写下来就好,而不用考虑线程切换,也不再写出在回调中跳来跳去的代码。协程代码的可读性、理解难度是 RxJava 望城莫及的。

看上去连续的一段代码,执行起来却走走停停,不同的子代码段还可能执行在不同的线程上。协程就是用这种方式来实现异步

异步

最开始,在没有协程和各种异步工具时,只能这样实现异步:

// 构建主线程 Handler
val mainHandler = Handler(Looper.getMainLooper())
// 启动新线程
val handlerThread = HandlerThread("user")
handlerThread.start()
// 构建新线程 Handler
val handler = Handler(handlerThread.looper)
// 把"拉取用户信息"通过 Handler 发送到新线程执行
handler.post(object : Runnable {
    override fun run() {
        val user = fetchUser() //执行在新线程
        // 把用户信息通过 Handler 发送到主线程执行
        mainHandler.post(object : Runnable {
            override fun run() {
                tvName.text = user.name //执行在主线程
            }
        })
    }
})
Log.v("test", "after post") // 会立刻打印(主线程不被阻塞)

fun fetchUser(): User {
    Thread.sleep(1000) //模拟网络请求
    return User("taylor", 20, 0)
}
复制代码

这段代码从网络获取用户数据并显示在控件上。

代码的不同部分会执行在不同线程上:拉取用户信息的耗时操作会在handlerThread线程中执行,而界面显示逻辑在主线程。

这两个线程间步调不同(异步),即互不等待对方执行完毕再执行自己的后续代码(不阻塞)。它们通过进程间互发消息实现了异步。

这样写的缺点是在同一层次中暴露太多细节!构建并启动线程的细节、线程切换的细节、线程通信的细节、网络请求的细节。这些本该被隐藏的细节统统在业务层被铺开。

若改用RxJava就可以屏蔽这些细节:

userApi.fetchUser()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(
            { user ->
                tvName.text = user.name
            },
            { error ->
                Log.e("error","no user")
            }
        )
复制代码

RxJava 帮我们切换到 IO 线程做网路请求,再切换回主线程展示界面。线程间通信方式也从发消息变为回调。代码可读性瞬间提升。

若需求改成“获取用户信息后再根据用户 ID 获取其消费流水”,就得使用flatMap()将两个请求串联起来,此时不可避免地出现嵌套回调,代码可读性下降。

协程

若用协程,就可以像写同步代码一样写异步代码:

launch()

class TestActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.text)

        // 启动顶层协程
        GlobalScope.launch {
            // 拉取用户信息(挂起点)
            val user = fetchUser()
            // 拉取用户账单(挂起点)
            val bills = fetchBill(user)
            // 展示用户账单(UI操作)
            showBill(bills)
        }
        Log.v("test", "after launch") // 立刻打印(主线程不被阻塞)
    }

    // 挂起方法
    suspend fun fetchUser(): User {
        delay(1000) // 模拟网络请求
        return User("taylor", 20, 0)
    }

    // 挂起方法
    suspend fun fetchBill(user: User): List<Bill> {
        delay(2000) // 模拟网络请求
        return mutableListOf(Bill("Tmall", 10), Bill("JD", 20))
    }
}
复制代码

GlobalScope.launch()启动了一个协程,主线程不会被阻塞(“after launch”会立即打印)。其中GlobalScopeCoroutineScope的一个实现。

CoroutineScope称为 协程领域,它是协程中最顶层的概念,所有的协程都直接或间接的依附于它 ,它用于描述协程的归属,定义如下:

// 协程领域
public interface CoroutineScope {
    // 协程上下文
    public val coroutineContext: CoroutineContext
}

// 协程领域的静态实现:顶层领域
public object GlobalScope : CoroutineScope {
    // 空上下文
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
复制代码

协程领域 持有CoroutineContext

CoroutineContext称为 协程上下文 ,它是“和协程执行相关的一系列元素的集合”,其中最重要的两个是CoroutineDispatcher(描述协程代码分发到哪个线程执行)和Job(代表着协程本身)。

协程领域 有一个静态实现GlobalScope,它用于创建顶层协程,即其生命周期同 App 一致。

协程的启动方法被定义成CoroutineScope的扩展方法:

/**
 * 启动一个新协程,它的执行不会阻塞当前线程。默认情况下,协程会被立即执行。
 *
 * @param context 在原有协程上下文基础上附加的上下文
 * @param start 协程启动选项
 * @param block 协程体,它会在协程上下文指定的线程中执行
 **/
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,// 默认为空上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 默认启动选项
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}
复制代码

启动协程时,必须提供参数block(协程体),即在协程中执行的代码段。

Demo 在协程体中先后调用了两个带suspend的方法。

suspend方法称为 挂起方法。挂起的对象是其所在协程,即协程体的执行被暂停。被暂停的执行点称为 挂起点,执行挂起点之后的代码称为 恢复

Demo 中有两个挂起点:在用户信息不返回之前,拉取账单就不会被执行,在拉取账单不返回之前,就不会把数据填充到列表中。

withContext()

执行下 Demo,看看效果:

android.view.ViewRootImpl$CalledFromWrongThreadException: 
    Only the original thread that created a view hierarchy can touch its views.
复制代码

崩溃原因是“展示账单逻辑被执行在非UI线程”。GlobalScope.launch()将协程体调度到新线程执行,执行完耗时操作后,UI 展示时还需要调度回主线程:

GlobalScope.launch {
    val user = fetchUser()
    val bills = fetchBill(user)
    withContext(Dispatchers.Main) {
        showBill(bills)
    }
}
复制代码

withContext()是一个顶层挂起方法:

public suspend fun <T> withContext(
    context: CoroutineContext,// 指定 block 被调度到哪个线程执行
    block: suspend CoroutineScope.() ->  // 被调度执行的代码段
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    ...
}
复制代码

它用于在协程中切换上下文(切换协程体执行的线程)。withContext()会挂起当前协程(它是一个挂起点),直到block执行完,协程才会在自己原先的线程上恢复执行后续代码。

async()

上面的例子是两个串行请求,如果换成“等待两个并行请求的结果”,可以这样写:

GlobalScope.launch {
    val a = async { fetchA() }
    val b = async { fetchB() }
    a.await() // 挂起点
    b.await() // 挂起点
    Log.v("test","result=${a+b}")// 当两个网络请求都返回后才会打印
}

suspend fun fetchA(): String {
    ...// 网络请求
}

suspend fun fetchB(): String {
    ...// 网络请求
}
复制代码

在顶层协程中又调用async()启动了2个子协程:

// 启动协程,并返回协程执行结果
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}
复制代码

aync()也是CoroutineScope的扩展方法,和launch()唯一的不同是它引入了泛型,用于描述协程体执行的结果,并将其包装成一个Deferred作为返回值:

public interface Deferred<out T> : Job {
    // 挂起方法: 等待值的计算,但不会阻塞当前线程,计算完成后恢复当前协程执行
    public suspend fun await(): T
}
复制代码

调用async()启动子协程不会挂起外层协程,而是立即返回一个Deferred对象,直到调用Deferred.await(),协程的执行才会被挂起。当协程在多个Deferred对象上被挂起时,只有当它们都恢复后,协程才继续执行。这样就实现了“等待多个并行的异步结果”。

coroutineScope()

如果多个并行的异步操作没有返回值,如何等待它们都执行完毕?

GlobalScope.launch {
    // 挂起外层协程
    coroutineScope { // 和外层协程体执行在同一个线程中
        launch { updateCache() }
        launch { insertDb() }
    }
    Log.v("test", "after coroutineScope()") // 被coroutineScope阻塞,等其执行完毕才打印
}

suspend fun updateCache() {
    ...// 更新内存缓存
}

suspend fun insertDb() {
    ...// 插入数据库
}
复制代码

coroutineScope()创建了一个协程并阻塞当前协程,在其中调用launch()创建了2个子协程,只有当2个子协程都执行完毕后才会打印 log。

coroutineScope()声明如下:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    ...
}
复制代码

coroutineScope()有如下特点:

  1. 返回计算结果
  2. 阻塞当前协程
  3. 执行在和父协程相同的线程中
  4. 它等待所有子协程执行完毕

coroutineScope()withContext() 的一种情况,当给withContext()传入当前协程上下文时,它和 coroutineScope() 一模一样。它也会返回计算结果,也会阻塞当前线程,也会等待所有子协程执行完毕。

换句话说,coroutineScope() 是 不进行线程调度的 withContext()

GlobalScope的罪过

虽然上面这些代码都是正确的,但它们不该出现在真实项目中。

因为它们都使用GlobalScope.launch()来启动协程。这样做会让管理协程变得困难:

GlobalScope.launch()构建的协程是独立的,它不隶属于任何CoroutineScope。而且是静态的,所以生命周期和 APP 一致。

一旦被创建则要等到 APP 关闭时才会释放线程资源。若在短生命周期的业务界面使用,需纯手动管理生命周期,不能享受structured-concurrency

structured-concurrency 是一种并发编程范式,它是管理多线程并发执行生命周期的一种方式,它要求“执行单元”的孵化要有结构性,即新建的“执行单元”必须依附于一个更大的“执行单元”。这样就便于管理(同步)所有执行单元的生命周期。

Kotlin 协程实现了该范式,具体表现为:

  1. 新建协程必须隶属于一个CoroutineScope,新协程的Job也就成为CoroutineScope的子Job
  2. Job被结束时,所有子Job立马被结束(即使还未执行完)。
  3. Job会等待所有子协程都结束了才结束自己。
  4. Job抛出异常时,会通知父Job,父Job将其他所有子Job都结束。

先看一个手动管理协程生命周期的例子:如果一个 Activity 所有的协程都通过GlobalScope.launch()启动,那在 Activity 退出时,该如何取消这些协程?

办法还是有的,只要在每次启动协程时保存其Job的引用,然后在Activity.onDestroy()时遍历所有Job并逐个取消:

class TestActivity : AppCompatActivity(){
    // 持有该界面中所有启动协程的引用
    private var jobs = mutableListOf<Job>()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 启动顶层协程并保存其引用
        GlobalScope.launch {
            ...
        }.also { jobs.add(it) }
    }
    
    override fun onMessageReceive(msg: Message) {
        // 启动顶层协程并保存其引用
        GlobalScope.launch {
            ...
        }.also { jobs.add(it) }
    }
    
    override fun onDestroy() {
        super.onDestroy()
        // 将所有协程都取消以释放资源
        jobs.forEach { it.cancel() }
    }
}
复制代码

每一个GlobalScope.launch()都是独立的,且它不隶属于任何一个CoroutineScope。为了管理它们就必须持有每个启动协程的引用,并逐个手动释放资源。

若使用structured-concurrency范式就可以让管理变简单:

class TestActivity : AppCompatActivity(), CoroutineScope by MainScope() {{
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        launch { ... }
    }
    
    override fun onMessageReceive(msg: Message) {
        launch { ... }
    }
    
    override fun onDestroy() {
        super.onDestroy()
        cancel() 
    }
}
复制代码

Activity 实现了CoroutineScope接口并将其委托给MainScope():

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
复制代码

MainScope()是一个顶层方法,它新建了一个ContextScope实例,并为其指定上下文,其中一个是Dispatchers.Main,它是系统预定义的主线程调度器,这意味着,MainScope中启动的协程体都会被调度到主线程执行。

launch()cancel()都是 CoroutineScope的扩展方法,而 Activity 实现了该接口并委托给 MainScope。所以 Demo 中通过launch()启动的协程都隶属于 MainScope,onDestroy()中调用的cancel()取消了 MainScope 的 Job,它的所有子Job也一同被取消。

Activity 被创建的时CoroutineScope同时被实例化,在 Activity 被销毁时,所有的协程也被销毁,实现了协程和生命周期对象绑定。 再也不用担心后台任务完成后更新界面时,因 Activity 已销毁报空指针了。

协程可以和任何具有生命周期的对象绑定,比如 View,只有当 View 依附于界面时其对应的协程任务才有意义,所以当它与界面解绑时应该取消协程:

// 为 Job 扩展方法
fun Job.autoDispose(view: View) {
    // 判断传入 View 是否依附于界面
    val isAttached = Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT && view.isAttachedToWindow || view.windowToken != null
    // 如果 View 已脱离界面则直接取消对应协程
    if (!isAttached) {
        cancel()
    }

    // 构建 View 和界面绑定关系监听器
    val listener = object : View.OnAttachStateChangeListener {
        // 当 View 和界面解绑时,取消协程
        override fun onViewDetachedFromWindow(v: View?) {
            cancel()
            v?.removeOnAttachStateChangeListener(this)
        }

        override fun onViewAttachedToWindow(v: View?) = Unit
    }

    // 为 View 设置监听器
    view.addOnAttachStateChangeListener(listener)
    // 当协程执行完毕时,移除监听器
    invokeOnCompletion {
        view.removeOnAttachStateChangeListener(listener)
    }
}
复制代码

然后就可以像这样使用:

launch {
    // 加载图片
}.autoDispose(imageView)
复制代码

GlobalScope无法和任何生命周期对象绑定(除 App 生命周期),除了这个缺点外,还有一个:

coroutineScope {
    GlobalScope.launch {
        queryA()
    }
    GlobalScope.launch {
        queryB()
    }
}
复制代码

queryB()抛出异常时,queryA()不会被取消。因为它们是通过GlobalScope.launch()启动的,它们是独立的,不隶属于外层coroutineScope

但若换成下面这种方式,queryA()就会被取消:

coroutineScope {
    launch {
        queryA()
    }
    launch {
        queryB()
    }
}
复制代码

因为这里的launch()都是外层coroutineScope对象上的调用,所以它们都隶属于该对象。当子协程抛出异常时,父协程会受到通知并取消掉所有其他子协程。

viewModelScope

上一节的代码虽然是正确的,但依然不该出现在真实项目中。因为 Activity 属于View层 ,只应该包含和 UI 相关的代码,启动协程执行异步操作这样的细节不该在这层暴露。(架构相关的详细讨论可以点击我是怎么把业务代码越写越复杂的 | MVP - MVVM - Clean Architecture

真实项目中,协程更有可能在ViewModel层出现。只要引入 ViewModel Kotlin 版本的包就可以轻松地在ViewModel访问到CoroutineScope

implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0-alpha03"  
复制代码
class MainViewModel : ViewModel() {
    fun fetchBean() {
        // 系统为 ViewModel 预定义的 CoroutineScope
        viewModelScope.launch { 
            ...
        }
    }
}  
复制代码

viewModelScope被定义成ViewModel的扩展属性,这种扩展手法颇为巧妙,限于篇幅原因,准备单独写一篇详细分析。

疑惑

这篇仅粗略地介绍了协程相关的概念、协程的使用方式,及注意事项。依然留了很多疑惑,比如:

  1. 为啥要设定CoroutineScope这个角色?启动协程为啥要定义成CoroutineScope的扩展函数?
  2. CoroutineContext的内部结构是怎么样的?为啥要这样设计?
  3. 协程是如何将协程体中的代码调度到不同线程执行的?
  4. 协程是如何在挂起点恢复执行的?

下一篇将更加深入阅读源码,解答这些疑问。

推荐阅读

这是该系列的第十一篇,系列文章目录如下:

  1. Kotlin基础 | 白话文转文言文般的Kotlin常识

  2. Kotlin基础 | 望文生义的Kotlin集合操作

  3. Kotlin实战 | 用实战代码更深入地理解预定义扩展函数

  4. Kotlin实战 | 使用DSL构建结构化API去掉冗余的接口方法

  5. Kotlin基础 | 抽象属性的应用场景

  6. Kotlin进阶 | 动画代码太丑,用DSL动画库拯救,像说话一样写代码哟!

  7. Kotlin基础 | 用约定简化相亲

  8. Kotlin基础 | 2 = 12 ?泛型、类委托、重载运算符综合应用

  9. Kotlin实战 | 语法糖,总有一颗甜到你(持续更新)

  10. Kotlin 实战 | 干掉 findViewById 和 Activity 中的业务逻辑

  11. Kotlin基础 | 为什么要这样用协程?

  12. Kotlin 应用 | 用协程控制多个并行异步结果的优先级

  13. Kotlin实战 | 干掉 xml 再也不用为各种形状写一堆资源文件了

  14. Kotlin 进阶 | 不变型、协变、逆变