[译] 在 Android 使用协程(part III) - 在实际工作中使用

1,411 阅读18分钟

这是一篇关于在 Android 上使用协程的系列文章之一。通过实现一次请求来解释使用协程中的实际问题是这篇文章的重点。

本系列的其他文章:

[译] 在 Android 使用协程(part I) - 协程的背景知识

[译] 在 Android 使用协程(part II) - 入门指南

使用协程解决实际问题

本系列的第 1 部分和第 2 部分重点介绍了如何使用协程来简化代码、在 Android 上提供主线程安全调用以及避免协程泄露。有了这个背景,协程看起来是一个既可以用于后台处理,又可以简化 Callback 的很好解决方案。

到目前为止,我们主要关注的是「什么是协程」以及「如何管理它们」。在这篇文章中,我们将看看如何使用它们来完成一些真正的任务。协程是一种通用的编程语言特性,与函数处于同一级别,因此,你可以使用它们来实现任何使用函数和对象实现的功能。然而,有两种类型的任务总是出现在实际代码中,协程是一种很好的解决方案:

  1. 一次性请求 它们总是在得到响应时认为请求完成了,所以每次调用时都会重新运行的请求
  2. 流请求 它们不会在得到第一个响应时就认为请求完成了,还会继续观察改变并将其报告给调用者

协程是这两个任务的一个很好的解决方案。在这篇文章中,我们将深入研究一次性请求,并探索如何在 Android 上用协程实现它们。

一次性请求

每次调用一个一次性请求都会执行一次,并在响应时完成。此模式与常规函数调用相同——被调用,执行一些操作,然后返回。由于与函数调用的相似性,它们往往比流请求更容易理解。

每次调用一个一次性请求时都会执行一次。一旦得到响应,就停止执行。

对于一次性请求的示例,请考虑浏览器如何加载此页面。当你点击到这篇文章的链接时,浏览器向服务器发送了一个网络请求来加载页面。一旦页面被传输到你的浏览器,它就停止与后端通信——它已经获取到需要的所有数据。如果服务器修改了这篇文章,除非你刷新页面否则新的修改将不会显示在浏览器中。

因此,虽然它们缺乏流请求的实时推送功能,但一次性请求仍旧非常强大。在 Android 应用中,有很多事情可以通过一次性请求来解决,比如获取、存储或更新数据。对于排序列表之类的事情,它也是一种很好的模式。

问题:显示已排序的列表

让我们通过查看如何显示排序列表来研究一次性请求。为了让示例更加具体,我们构建了一个「存货清单」的应用,供商店员工使用。它将用于根据产品最后一次进货的时间查找产品——他们希望能够对列表进行升序和降序排序。因为有很多产品,排序产品可能需要一秒钟,所以我们将使用协程来避免阻塞主线程!

在这个应用中,所有的产品都存储在一个 Room 数据库中。这是一个很好的用例,因为它不需要涉及网络请求,所以我们可以关注模式。尽管这个示例比较简单,因为它不使用网络,但是它展示了实现一次性请求所需的模式。

要使用协程实现这个请求,你将把协程引入到 ViewModel、Repository 和 Dao。让我们逐个浏览一下,看看如何将它们与协程集成在一起。

class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
   val _sortedProducts = MutableLiveData<List<ProductListing>>()
   val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts

   /**
    * 当用户点击 sort 按钮时调用,由 UI 层调用
    */
   fun onSortAscending() = sortPricesBy(ascending = true)
   fun onSortDescending() = sortPricesBy(ascending = false)

   private fun sortPricesBy(ascending: Boolean) {
       viewModelScope.launch {
           // 挂起和恢复使这个数据库请求确保主线程安全
           // 所以我们的 ViewModel 不需要担心线程
           _sortedProducts.value =
                   productsRepository.loadSortedProducts(ascending)
       }
   }
}

ProductsViewModel 负责从 UI 层接收事件,然后向存储库请求更新后的数据。它使用 LiveData 保存当前已排序的列表,以便让 UI 显示。当 sortProductsBy 接收一个新事件时,启动一个新的协程来对列表进行排序,并在响应时更新 LiveData。ViewModel 通常是这个体系结构中启动大多数协程的正确位置,因为它可以在 onCleared中取消协程。如果用户离开界面,它们通常不再需要工作。

如果你还不经常使用 LiveData,请查看 @CeruleanOtter发布的这篇很棒的文章,它介绍了如何为 UI 存储数据.

一个简单的 ViewModel 示例 (ViewModels : A Simple Example)

这是 Android 上协程的一般模式。由于 Android 框架不能调用挂起函数,因此你需要配合一个协程来响应 UI 事件。最简单的办法是事件发生时启动一个新的协程,而在 ViewModel 做这件事比较合适。

在 ViewModel 中启动协程作为一般模式。

ViewModel 使用 ProductsRepository 来实际获取数据。来看它是这样做的:

class ProductsRepository(val productsDao: ProductsDao) {

  /**
    * 这是一个"常规"挂起函数,这意味着调用者必须处于一个协程中。存储层不负责启动或
    * 停止协程,因为它没有一个合适的生命周期来取消不必要的工作。
    * 这可以是从 Dispatchers.Main 调用的,而且是主线程安全的,因为 Room 将为我们负责
    * 主线程安全。
    */
   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       return if (ascending) {
           productsDao.loadProductsByDateStockedAscending()
       } else {
           productsDao.loadProductsByDateStockedDescending()
       }
   }
}

ProductsRepository 为产品交互提供了一个合理的接口。在这个应用程序中,由于所有内容都在本地的 Room 数据库中,所以他只是为 @Dao 提供了一个很好的接口,对于不同的排序,@Dao 有两个不同的函数。

存储层是 Android 架构体系中一个可选部分——但如果你的应用有它或类似的层,它应该更愿意暴露常规挂起函数。因为存储层没有一个天然的生命周期——它只是一个对象——它没有办法清理工作。因此,在默认情况下,在存储库中启动的任何协程都会泄露。

除了避免泄露之外,通过暴露常规挂起函数,还可以很容易地在不同的上下文中复用存储库。任何知道如何创造协程的东西都可以调用 loadSortedProducts 。例如,Workmanager 调度的后台 Job 可以直接调用它。

存储库应该暴露出主线程安全的常规挂起函数。

注意:一些后台保存操作可能会希望用户离开界面后继续执行——在没有生命周期的情况下运行这些保存是有意义的。在大多数其他情况下,viewModelScope 是一个合理的选择。

继续看 ProductsDao,它看起来是这样的:

@Dao
interface ProductsDao {
   // 因为这是挂起的,Room 将使用它自己的调度器以主线程安全的方式运行这个查询
   @Query("select * from ProductListing ORDER BY dateStocked ASC")
   suspend fun loadProductsByDateStockedAscending(): List<ProductListing>

   // 因为这是挂起的,Room 将使用它自己的调度器以主线程安全的方式运行这个查询
   @Query("select * from ProductListing ORDER BY dateStocked DESC")
   suspend fun loadProductsByDateStockedDescending(): List<ProductListing>
}  

ProductsDao 是一个 Room @Dao ,它公开了两个挂起函数。由于函数被suspend 标记,Room 确保它们是主线程安全的。这意味着你可以直接从 Dispatchers.Main 来调用它们。

如果你还没有在 Room 中看到协程,请查看 @FMuntenescu的这篇很棒的文章

Room 🔗 Coroutines

不过有一点注意,调用它的协程将位于主线程上。因此,如果你对结果做了一些花销大的操作(比如将它们转换为一个新列表),你应该确保它没有阻塞主线程。

注意:Room 使用自己的调度器在后台线程上运行查询。你的代码不应该用 withContext(Dispatchers.IO) 来调用 Room 的挂起查询函数。这会使代码变得复杂,使查询运行的更慢。

Room 的挂起函数是主线程安全的,并在自定义调度器上运行。

一次性请求模式

这是在 Android 架构组件中使用协程发出一次性请求的完整模式。我们将协程添加到 ViewModel、Repository 和 Room 中,每一层都有不同的职责。

  1. ViewModel 在主线程上启动一个协程——当它得到响应时,它就完成了。

  2. Repository 暴露常规挂起函数,并确保它们是主线程安全的。

  3. 数据库和网络暴露常规挂起函数,并确保它们是主线程安全的。

ViewModel 负责启动协程,并确保在用户离开界面时它们被取消。它不做大开销的工作——而是依靠其他层来完成繁重的工作。一旦有了响应,它就使用 LiveData 将其发送到 UI。由于 ViewModel 不做大开销的工作,所以它在主线程上启动协程。因为运行在主线程上,如果响应立即可用(例如从内存缓存中获取),它可以更快的响应用户事件。

存储库通过暴露挂起函数来外界访问数据。它通常不会自己启动一个长生命周期的协程,因为没有任何办法取消它们。每当存储库必须做一些开销大的事情,比如转换列表时,它应该使用 withContext 来暴露一个主线程安全的接口。

数据层(网络或数据库) 总是暴露常规挂起函数。使用 Kotlin 协程时,这些挂起函数是主线程安全的,这一点很重要,Room 和 Retrofit 都遵循这种模式。

在一次性请求中,数据层只暴露挂起函数。如果调用者想要一个新的数据,则必须再调用它们。这就像 web 浏览器上的刷新按钮一样。

值的花点时间来确保你理解这些一次性请求的模式。这是 Android 上协程的正常模式,你会一直使用它。

我们的第一个 Bug 报告!

在测试了该解决方案之后,你将其投入生产,并且在接下来的几周内一切都很顺利,直到你得到一个非常奇怪的 Bug 报告:

主题:🐞——错误的排序顺序!

报告:当我非常非常非常快地点击排序按钮时,有时排序是错误的。这个问题偶尔才会发生🙃。

你看了看,挠挠头。有什么地方可能出错呢?流程看起来相当简单:

  1. 用户启动排序请求
  2. 在 Room 调度器中执行排序
  3. 显示排序的结果

你很想用 "不会修复-不要按按钮那么快"来关闭这个 Bug,但是你担心可能有哪里不对。在添加打印 Log 并编写了一个测试来同时调用多个排序之后——你终于找到了答案!

最终显示的结果实际上不是"排序的结果",而是"最后一个完成的排序"的结果。"当用户重复点击按钮时,它们会同时启动多个排序任务,并且可能得到任意排序的结果"。

在响应 UI 事件启动一个新的协程时,请考虑如果用户在这个事件完成之前启动了另一个协程会发生什么。

这是一个并发性 Bug,它实际上与协程没有关系。如果我们以同样的方式使用回调、Rx,甚至是 ExecutorService 也会有同样的 Bug。

在 ViewModel 和存储库中,有很多办法可以修复这个问题。让我们研究一些模式,以确保按用户期望的顺序完成一个一次性请求。

最佳解决方案:禁用按钮

问题的根本原因是我们同时执行了两次排序。我们可以通过让它同时只做一次排序来解决这个问题!最简单的办法是在合适的时候禁用排序按钮。

这似乎是一个简单的解决方案,但它确实是一个好主意。实现这一点的代码很简单,并且易于测试,只要它的 UI 不是无厘头的,就完全可以解决问题!

要禁用按钮,那么就告诉 UI 排序请求正在 sortPricesBy 中进行,如下所示:

// 0 号解决方案:在运行任何排序时禁用排序按钮

class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
   val _sortedProducts = MutableLiveData<List<ProductListing>>()
   val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
  
   val _sortButtonsEnabled = MutableLiveData<Boolean>()
   val sortButtonsEnabled: LiveData<Boolean> = _sortButtonsEnabled
  
   init {
       _sortButtonsEnabled.value = true
   }

   /**
    * 在用户点击合适的排序按钮时,由 UI 层调用
    */
   fun onSortAscending() = sortPricesBy(ascending = true)
   fun onSortDescending() = sortPricesBy(ascending = false)

   private fun sortPricesBy(ascending: Boolean) {
       viewModelScope.launch {
           // 在排序进行时禁用按钮
           _sortButtonsEnabled.value = false
           try {
               _sortedProducts.value =
                       productsRepository.loadSortedProducts(ascending)
           } finally {
               // 在排序完成时恢复按钮
               _sortButtonsEnabled.value = true
           }
       }
   }
}

这样还不错。只需要在调用了存储库的sortPricesBy ,排序开始时就会禁用排序按钮。

在大多数情况下,这是解决这个问题的正确方法。但是如果我们想让按钮保持启用状态并修复 Bug 呢?这有点难,我们将在接下来的部分探索一些不同的选项。

重要提示:这段代码显示了在主线程上启动协程的一个主要优势——按钮在点击时立刻禁用。如果你切换调度器,在低端手机上进行快速操作的用户可以发送不止一个点击事件!

并发模式

接下来几节将探索高级主题——如果你刚刚开始使用协程,那么你不需要立刻理解它们。简单地禁用按钮时你将遇到的大多数问题的最佳解决方案。

在这篇文章的其余部分,我们将探讨如何在按钮可用时,但又确保一次性请求的执行顺序不会让用户感到意外的情况下使用协程。我们可以通过控制协程何时运行(或不运行)来避免意外的并发情况。

对于一次性请求,可以使用三种基本模式来确保每次只运行一个请求。

  1. 在开始更多的工作前,先取消之前的工作。
  2. 将下一个工作排队,等待前面的请求完成后继续再启动另一个。
  3. 如果已经有一个请求在运行,那么让之前的工作执行完毕然后返回之前的工作结果,而不启动后来的一个请求。

当你查看这些解决方案时,你会注意到它们的实现有些复杂。为了关注如何使用这些模式而不是实现细节,我 创建了一个gist,将所有的三个模式的实现都作为可复用的抽象。

1 号解决方案:取消之前的工作

在排序时,从用户获得一个新事件通常意味着可以取消最后一个排序。毕竟,如果用户已经告诉你他们不想要前面那个结果,那么继续下去又有什么意义呢?

要取消之前的请求,我们需要以某种方式跟踪它。函数cancelPreviousThenRungist中就是这样做的。

让我们来看看如何用它来修复 Bug:

// 1 号解决方案:取消之前的工作

// 对于排序和过滤这样的任务,这是一个很好的解决方案,如果有新的请求进来,可以取消这些任务

class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   var controlledRunner = ControlledRunner<List<ProductListing>>()

   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       // cancel the previous sorts before starting a new one
       return controlledRunner.cancelPreviousThenRun {
           if (ascending) {
               productsDao.loadProductsByDateStockedAscending()
           } else {
               productsDao.loadProductsByDateStockedDescending()
           }
       }
   }
}

查看 gist 中的 cancelPreviousThenRun 的示例实现是了解如何跟踪正在进行工作的好办法。

// 查看完整的实现在 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7
suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
   // 如果有 activeTask,取消它,因为不在需要它的结果
   activeTask?.cancelAndJoin()
   
   // ...

简而言之,它总是跟踪成员变量 activeTask 中当前活动的排序。每当排序开始时,它将立即调用activeTaskcancelAndJoin。这样做的效果是,在开始一个新的排序之前,取消任何正在进行的排序。

使用类似 ControlledRunner<T> 的抽象来封装这样的逻辑是一个好主意,而不是将特别的并发性与应用程序逻辑混合在一起。

考虑构建抽象,以避免将特别的并发模式与应用程序代码搞混。

重要说明:此模式不适合在全局单例中使用,因为不相关的调用方不应该相互取消。

2 号解决方案:排队进行

有一种解决并发 bug 的办法总是有效的。只要把请求排队,一次只能发生一件事!就像商店中的队列一样,请求将按启动的顺序依次执行。

对于这个特殊的排序问题,取消可能比排队更好,但是排队进行还是值得讨论,因为它也是有用的。

// 2 号解决方案:添加互斥锁

// 注意:这对于排序或过滤的特定用例不是最优的,但是对于网络保存是一种很好的模式
class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   val singleRunner = SingleRunner()

   suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
       // wait for the previous sort to complete before starting a new one
       return singleRunner.afterPrevious {
           if (ascending) {
               productsDao.loadProductsByDateStockedAscending()
           } else {
               productsDao.loadProductsByDateStockedDescending()
           }
       }
   }
}

每当出现新的排序时,它都会使用SingleRunner 实例来确保一次只运行一个排序。它使用 Mutex,这是一个单一的票据(或锁),协程必须获得它才能进入代码块。如果在一个协程正在运行时尝试启动另一个协程,它将挂起自己,直到所有等待的协程都完成了。

Mutex 允许你确保一次只运行一个协程——并且它们会按启动的顺序结束。

3 号解决方案:返回前面的工作结果

第三个解决方案是使用前面的已经进行工作。如果新请求将重新启动已经完成一半的相同工作,这是一个好主意。

这种模式对于 sort 函数没有太大意义,但是对于加载网络数据来说,它是一种合适的选择。

对于我们的产品目录应用,用户需要一种办法从服务器来获取一个新的产品目录。作为一个简单的 UI 我们将为它提供一个刷新按钮,它们可以按这个按钮启动一个新的网络请求。

与排序按钮一样,只要在请求运行时禁用按钮,就可以完全解决这个问题。但是如果我们不这样做,或者不能这样做,我们可以加入现有的请求。

让我们来看看一些使用 joinPreviousOrRun 的 gist 代码,看看它是如何工作的。

class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
   var controlledRunner = ControlledRunner<List<ProductListing>>()

   suspend fun fetchProductsFromBackend(): List<ProductListing> {
       // 如果已经有一个请求,则返回现有的请求的结果。如果没有则通过允许该块来启动一个新请求。
       return controlledRunner.joinPreviousOrRun {
           val result = productsApi.getProducts()
           productsDao.insertAll(result)
           result
       }
   }
}

这将反转 cancelPreviousAndRun 的行为。它将丢弃新请求并避免运行它,而不是通过取消它来丢弃以前的请求。如果已经有一个请求在运行,它将等待当前"正在执行"的请求的结果,并返回该结果,而不是运行一个新请求。传入的代码块只有在没有任何正在运行的请求时才会被执行。

你可以在 joinPreviousOrRun 开始时看到它是如何工作的——如果 activeTask 已经存在,那么它只返回之前这个的结果:

// 查看完整的代码 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124

suspend fun joinPreviousOrRun(block: suspend () -> T): T {
    // 如果有 activeTask ,返回它的结果,不要运行新的代码块
    activeTask?.let {
        return it.await()
    }
    // ...

这种模式适用于像按 id 获取产品这样的请求。你可以维护一个id 对应 Deferred 的 Map,然后使用相同的结合逻辑跟踪相同产品的先前的请求。

结合之前的工作是避免重复网络请求的一个很好的解决方案。

What’s next?

在本文中,我们探讨了如何使用 Kotlin 协程实现一次性请求。首先,我们实现了一个完整的模式,展示了如何在 ViewModel 中启动协程,然后从存储库和 Room Dao 中暴露常规挂起函数。

对于大多数任务,为了在 Android 上使用 Kotlin 协程,这就是你需要做的全部工作。这种模式可以应用于许多常见的任务,比如我们这里展示的排序列表。你还可以使用它来获取、保存或更新网络上的数据。

然后我们研究了一个可能出现的小错误和可能会用到的解决方案。修复这个问题最简单(通常也是最好)的办法是在 UI 中——只要在排序过程中禁用排序按钮。

最后,我们研究了一些高级并发模式以及如何在 Kotlin 协程中实现它们。这方面的代码有点复杂,但它确实为一些高级协程主题提供了很好的介绍。

在下一篇文章中,我们将研究流请求,并探索如何使用 LiveData 构建器!