最全面的Kotlin协程: Coroutine/Channel/Flow 以及实际应用

34,064 阅读31分钟

协程这个概念在1958年就开始出现, 比线程更早, 目前很多语言开始原生支, Java没有原生协程但是可以大型公司都自己或者使用第三方库来支持协程编程, 但是Kotlin原生支持协程.

本文更新自: 2021/8/18 协程_v1.5 kotlin_v1.5.21

我认为协程的核心就是一个词: 作用域, 理解什么是作用域就理解协程了

什么是协程

协程是协作式任务, 线程是抢占式任务, 本质上两者都属于并发

Kotlin协程就是线程库不是协程? 内部代码用的线程池?

  1. 最知名的协程语言Go内部也是维护了线程
  2. 协程只是方便开发者处理异步(可以减少线程数量), 线程才能发挥性能
  3. 协程是一种概念, 无关乎具体实现方式
  4. kotlin标准库中的协程不包含线程池代码, 仅扩展库才内部实现线程池

协程设计来源

  1. Kotlin的协程完美复刻了谷歌的Go语言的协程设计模式(作用域/channel/select), 将作用域用对象来具化出来; 且可以更好地控制作用域生命周期;
  2. await模式(JavaScript的异步任务解决方案)
  3. Kotlin参考RxJava响应式框架创造出Flow
  4. 使用协程开始就不需要考虑线程的问题, 只需要在不同场景使用不同的调度器(调度器会对特定任务进行优化)就好

特性

使用场景

假设首页存在七个接口网络请求(后端人员处理差)的情况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍.

不是说这种并发只能协程实现, 但是协程实现是目前最优解

目前计算机都是通过多核CPU提升计算能力, 所以熟练掌握并发编程是未来的趋势

协程优势

  1. 并发实现方便
  2. 没有回调嵌套发生. 代码结构清晰
  3. 易于封装扩展
  4. 创建协程性能开销优于创建线程, 一个线程可以运行多个协程, 单线程即可异步

实验特性

协程在Kotlin1.3时候放出正式版本, 目前仍然存在不稳定函数(不影响项目开发), 通过注解标识

@FlowPreview 代表可能以后存在Api函数变动

@ExperimentalCoroutinesApi  代表目前可能存在不稳定的因素的函数

@ObsoleteCoroutinesApi 可能存在被废弃的可能

构成

Kotlin的协程主要构成分为三部分

  1. CoroutineScope 协程作用域: 每个协程体都存在一个作用域, 异步还是同步由该作用域决定
  2. Channel 通道: 数据如同一个通道进行发送和接收, 可以在协程之间互相传递数据或者控制阻塞和继续
  3. Flow 响应流: 类似RxJava等结构写法

推荐项目架构 MVVM + Kotlin + Coroutine + JetPack主要带来的优势;

  1. 简洁, 减少70%左右代码
  2. 双向数据绑定(DataBinding)
  3. 并发异步任务(网络)倍增速度
  4. 更健壮的数据保存和恢复

如果你想取代RxJava那么以下两个库我强烈推荐

框架描述
Net专为Android设计的协程并发网络请求库, 其中计时器/轮询器也有使用协程Channel设计
Channel基于协程/LiveData实现的事件分发框架

依赖

这里我们使用协程扩展库, kotlin标准库的协程太过于简陋不适用于开发者使用

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.0"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.0"

创建协程

开启主协程的分为三种方式

生命周期和App一致, 无法取消(不存在Job), 不存在线程阻塞

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虚拟机退出
}

这里说的是GlobalScope没有Job, 但是启动的launch都是拥有Job的. GlobalScope本身就是一个作用域, launch属于其子作用域;

不存在线程阻塞, 可以取消, 可以通过CoroutineContext控制协程生命周期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}

线程阻塞, 适用于单元测试, 不需要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数可以在任意地方调用

一般我们在项目中是不会使用runBlocking, 因为阻塞主线程没有开启的任何意义

fun main() = runBlocking { 
    // 阻塞线程直到协程作用域内部所有协程执行完毕
}

创建作用域

协程内部还可以使用函数创建其他协程作用域, 分为两种创建函数:

  1. CoroutineScope的扩展函数, 只有在作用域内部才能创建其他的作用域
  2. suspend修饰的函数内部
  3. 协程永远会等待其内部作用域内所有协程都执行完毕后才会关闭协程

在主协程内还可以创建子协程作用域, 创建函数分为两种

  1. 阻塞作用域(串行): 会阻塞当前作用域

  2. 挂起作用域(并发): 不会阻塞当前作用域

同步作用域函数

都属于suspend函数

  • withContext 可以切换调度器, 有返回结果
  • coroutineScope 创建一个协程作用域, 该作用域会阻塞当前所在作用域并且等待其子协程执行完才会恢复, 有返回结果
  • supervisorScope 使用SupervisorJob的coroutineScope, 异常不会取消父协程
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T
// 返回结果; 可以和当前协程的父协程存在交互关系, 主要作用为来回切换调度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke(
    noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函数而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

异步作用域函数

这两个函数都不属于suspend, 只需要CoroutineScope就可以调用

  • launch: 异步并发, 没有返回结果
  • async: 异步并发, 有返回结果
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

并发

同一个协程作用域中的异步任务遵守顺序原则开始执行; 适用于串行网络请求, 在一个异步任务需要上个异步任务的结果时.

协程挂起需要时间, 所以异步协程永远比同步代码执行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34)    后执行")
    }

    System.err.println("(Main.kt:37)    先执行")
}

当在协程作用域中使用async函数时可以创建并发任务

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

示例

fun main() = runBlocking<Unit> {
	val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35)    result = ${name.await() + title.await()}")
    delay(2000)
}
  1. 返回对象Deferred; 通过函数await获取结果值
  2. Deferred集合还可以使用awaitAll()等待全部完成
  3. 不执行await任务也会等待执行完协程关闭
  4. 如果Deferred不执行await函数则async内部抛出的异常不会被logCat或tryCatch捕获, 但是依然会导致作用域取消和异常崩溃; 但当执 行await时异常信息会重新抛出

惰性并发

将async函数中的start设置为CoroutineStart.LAZY时则只有调用Deferred对象的await时才会开始执行异步任务(或者执行start函数)

启动模式

  1. DEFAULT 立即执行
  2. LAZY 直到Job执行start或者join才开始执行
  3. ATOMIC 在作用域开始执行之前无法取消
  4. UNDISPATCHED 不执行任何调度器, 直接在当前线程中执行, 但是会根据第一个挂起函数的调度器切换

异常

协程中发生异常, 则父协程取消并且父协程其他的子协程同样全部取消

Deferred

继承自Job

提供一个全局函数用于创建CompletableDeferred对象, 该对象可以实现自定义Deferred功能

public suspend fun await(): T 
// 结果
public val onAwait: SelectClause1<T>
// 在select中使用

public fun getCompleted(): T
// 如果完成[isCompleted]则返回结果, 否则抛出异常
public fun getCompletionExceptionOrNull(): Throwable?
// 如果完成[isCompleted]则返回结果, 否则抛出异常

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72)    结果 = ${deferred.await()}")
}

创建CompletableDeferred的顶层函数

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>

CompletableDeferred函数

public fun complete(value: T): Boolean
// 结果

public fun completeExceptionally(exception: Throwable): Boolean
// 抛出异常, 异常发生在`await()`时

public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean
// 可以通过标记来判断是否成功, 避免异常抛出

CoroutineScope

创建此对象表示创建一个协程作用域

结构化并发

如果你看协程的教程可能会经常看到这个词, 这就是作用域内部开启新的协程; 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发

在一个协程作用域里面开启多个子协程进行并发行为

CoroutineContext

协程上下文, 我认为协程上下文可以看做包含协程基本信息的一个Context(上下文), 其可以决定协程的名称或者运行

创建一个新的调度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher

创建新的调度器比较消耗资源, 建议复用且当不需要的时候使用close函数释放

调度器

Dispatchers继承自CoroutineContext, 该枚举拥有三个实现; 表示不同的线程调度; 当函数不使用调度器时承接当前作用域的调度器

  1. Dispatchers.Unconfined 不指定线程, 如果子协程切换线程那么接下来的代码也运行在该线程上
  2. Dispatchers.IO 适用于IO读写
  3. Dispatchers.Main 根据平台不同而有所差, Android上为主线程
  4. Dispatchers.Default 默认调度器, 在线程池中执行协程体, 适用于计算操作

立即执行

Dispatchers.Main.immediate

immediate属于所有调度器都有的属性, 该属性代表着如果当前正处于该调度器中不执行调度器切换直接执行, 可以理解为在同一调度器内属于同步协程作用域

例如launch函数开启作用域会比后续代码执行顺序低, 但是使用该属性协程属于顺序执行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 执行顺序 1
}

// 执行顺序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 执行顺序 4
}

// 执行顺序 3

协程命名

通过创建一个CoroutineName对象, 在构造函数中指定参数为协程名称, CoroutineName继承自CoroutineContext.

launch(CoroutineName("吴彦祖")){

}

协程上下文名称用于方便调试使用

协程挂起

yield函数可以让当前协程暂时挂起执行其他协程体, 如果没有其他正在并发的协程体则继续执行当前协程体(相当于无效调用)

public suspend fun yield(): Unit

看协程中可能经常提及挂起, 挂起可以理解为这段代码(作用域)暂停, 然后执行后续代码; 挂起函数一般表示suspend关键字修饰的函数, suspend要求只允许在suspend修饰的函数内部调用, 但是本身这个关键字是没做任何事的. 只是为了限制开发者随意调用

挂起函数调用会在左侧行号列显示箭头图标

image-20200106120117080

JOB

在协程中Job通常被称为作业, 表示一个协程工作任务, 他同样继承自CoroutineContext

val job = launch {

}

Job属于接口

interface Job : CoroutineContext.Element

函数

public suspend fun join()
// 等待协程执行完毕都阻塞当前线程
public val onJoin: SelectClause0
// 后面提及的选择器中使用

public fun cancel(cause: CancellationException? = null)
// 取消协程
public suspend fun Job.cancelAndJoin()
// 阻塞并且在协程结束以后取消协程

public fun start(): Boolean
public val children: Sequence<Job>
// 全部子作业

public fun getCancellationException(): CancellationException

public fun invokeOnCompletion(
  onCancelling: Boolean = false, 
  invokeImmediately: Boolean = true, 
  handler: CompletionHandler): DisposableHandle
// p1: 当为true表示cancel不会回调handler
// p2: 当为true则先执行[handler]然后再返回[DisposableHandle], 为false则先返回[DisposableHandle]

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 当其作用域完成以后执行, 主协程指定才有效, 直接给CoroutineScope指定时无效的
// 手动抛出CancellationException同样会赋值给cause

状态

通过字段可以获取JOB当前处于状态

public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean

扩展函数

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()

每个协程作用域都存在coroutineContext. 而协程上下文中都存在Job对象

coroutineContext[Job]

结束协程

如果协程作用域内存在计算任务(一直打日志也算)则无法被取消, 如果使用delay函数则可以被取消;

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 这行代码存在则可以成功取消协程, 不存在则无法取消
      System.err.println("(Main.kt:30)    ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42)    结束")
}

通过使用协程内部isActive属性来判断是否应该结束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦协程被取消则为false
            System.err.println("(Main.kt:30)    ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42)    结束")
}

释放资源

协程存在被手动取消的情况, 但是有些资源需要在协程取消的时候释放资源, 这个操作可以在finally中执行

无论如何finally都会被执行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31)    it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的协程无法继续挂起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42)    ")
}

再次开启协程

通过withContextNonCancellable可以在已被取消的协程中继续挂起协程; 这种用法其实可以看做创建一个无法取消的任务

withContext(NonCancellable) {
    println("job: I'm running finally")
    delay(1000L)
    println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}

上下文组合

协程作用域可以接收多个CoroutineContext作为上下文参数; CoroutineContext本身属于接口, 很多上下文相关的类都实现与他

配置多个CoroutineContext可以通过+符号同时指定多个协程上下文, 每个实现对象可能包含一部分信息可以存在覆盖行为故相加时的顺序存在覆盖行为

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51)    ${Thread.currentThread()}")
}
launch(Dispatchers.IO + CoroutineName("吴彦祖")){	}

协程局部变量

使用ThreadLocal可以获取线程的局部变量, 但是要求使用扩展函数asContextElement转为协程上下文作为参数传入在创建协程的时候

该局部变量作用于持有该协程上下文的协程作用域内

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> = ThreadLocalElement(value, this)

超时

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超过指定时间timeMillis自动结束协程; 
// 当没有超时时返回值获取并且继续执行协程; 
// 当超时会抛出异常TimeoutCancellationException, 但是不会导致程序结束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 如果超时不会结束协程而是返回null

无法手动抛出TimeoutCancellationException, 因为其构造函数私有

全局协程作用域

全局协程作用域属于单例对象, 整个JVM虚拟机只有一份实例对象; 他的寿命周期也跟随JVM. 使用全局协程作用域的时候注意避免内存泄漏

public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext];
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

全局协程作用域不继承父协程作用域的上下文, 所以也不会因为父协程被取消而自身被取消

启动模式

  • DEFAULT 立即执行协程体
  • ATOMIC 立即执行协程体,但在开始执行协程之前无法取消协程
  • UNDISPATCHED 立即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程
  • LAZY 手动执行startjoin才会执行协程

协程取消

协程体如果已经执行实际上属于不可取消的, 在协程体中通过isActive来判断协程是否处于活跃中

通过取消函数的参数指定异常CancellationException可以自定义异常对象

不可取消的协程作用域

NonCancellable该单例对象用于withContext函数创建一个无法被取消的协程作用域

withContext(NonCancellable) {
  delay(2000)
}

示例

fun main() = runBlocking {
  launch {
    delay(1000)
    System.err.println("(Main.kt:19)    ")
  }

  launch {
    withContext(NonCancellable) {
      delay(2000)
      System.err.println("(Main.kt:26)    ")
    }
  }

  delay(500) // 防止launch还未开启withContext就被取消
  cancel()
}
  1. 当子作用域内包含没有终止的任务, 将等待任务完成后才会取消(delay不存在, Thread.sleep可以模拟未结束的任务)
  2. 抛出CancellationException视作结束异常, invokeOnCompletion也会执行(其中包含异常对象), 但是其他异常将不会执行invokeOnCompletion

取消GlobalScope

GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 所以无法取消协程. 但是可以通过给GlobalScope开启的协程作用域指定Job然后就可以使用Job取消协程

协程异常

通过CoroutineExceptionHandler函数可以创建一个同名的对象, 该接口继承自CoroutineContext, 同样通过制定上下文参数传递给全局协程作用域使用, 当作用域抛出异常时会被该对象的回调函数接收到, 并且不会抛出异常

  1. CoroutineExceptionHandler 只有作为最外层的父协程上下文才有效, 因为异常会层层上抛, 除非配合SupervisorJob监督作业禁止异常上抛, 子作用域的异常处理器才能捕获到异常

  2. CoroutineExceptionHandler异常处理器并不能阻止协程作用域取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已

  3. 只要发生异常就会导致父协程和其所有子协程都被取消, 这种属于双向的异常取消机制, 后面提到的监督作业(SupervisorJob)属于单向向下传递(即不会向上抛出)

  4. CoroutineExceptionHandler会被作用域一直作为协程上下文向下传递给子作用域(除非子作用域单独指定)

(如下示例)不要尝试使用try/catch捕捉launch作用域的异常, 无法被捕捉.

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}

后面专门介绍如何捕获协程异常避免抛出.

协程取消异常

取消协程的作业(Job)会引发异常, 但是会被默认的异常处理器给忽略, 但是我们可以通过捕捉可以看到异常信息

fun main() = runBlocking<Unit> {
  val job = GlobalScope.launch {

    try {
      delay(1000)
    } catch (e: Exception) {
      e.printStackTrace()
    }
  }

  job.cancel(CancellationException("自定义一个用于取消协程的异常"))
  delay(2000)
}

Job取消函数

public fun cancel(cause: CancellationException? = null)
  • cause: 参数不传默认为JobCancellationException

全局协程作用域的异常处理

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
       System.err.println("(Main.kt:41):main   coroutineContext = $coroutineContext, throwable = $throwable")
}

GlobalScope.launch(exceptionHandler) { 

}

子协程设置异常处理器是无效的, 即使设置了错误依然会抛到父协程从而而没有意义. 除非同时使用异常处理器+监督作业(SupervisorJob), 这样就是让子协程的错误不向上抛(后面详解监督作业), 从而被其内部的异常处理器来处理.

异常聚合和解包

全局协程作用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 第三, 这里的异常是第一个被抛出的异常对象
        println("捕捉的异常: $exception 和被嵌套的异常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 当父协程被取消时其所有子协程都被取消, finally被取消之前或者完成任务之后一定会执行
                throw ArithmeticException() // 第二, 再次抛出异常, 异常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 第一, 这里抛出异常将导致父协程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope作用域没有执行完毕JVM虚拟机就退出
}

监督作业

一般情况子协程发生异常会导致父协程被取消, 同时父协程发生异常会取消所有的子协程; 但是有时候子协程发生异常我们并不希望父协程也被取消, 而是仅仅所有子协程取消(仅向下传递异常), 这个使用就是用SupervisorJob作业

创建监督作业对象

fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
        launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ ->  }) {
            throw NullPointerException()
        }

        delay(500)
        println("( Process.kt:13 )    ")
    }

    println("( Process.kt:16 )    finish")
}
  1. 必须添加CoroutineExceptionHandler处理异常, 否则异常依然会向上传递取消父协程

  2. 直接创建 SupervisorJob() 对象传入作用域中会导致该作用域和父协程生命周期不统一的问题, 即父协程取消以后该子协程依然处于活跃状态, 故需要指定参数为coroutineContext[Job]即传入父协程的作业对象

  3. SupervisorJob仅能捕捉内部协程作用域的异常, 无法直接捕捉内部协程

    supervisorScope {
        // throw NoSuchFieldException() 抛出崩溃
        
        launch {
             throw NoSuchFieldException() // 不会抛出
        }
    }
    

监督作业在withContextasync中添加无效

直接创建一个异常向下传递监督作业的作用域

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R
  • 该函数属于阻塞
  • 具备返回值
  • supervisorScope函数使用的依然是当前作用域的Job, 所以跟随当前作用域生命周期, 可以被取消
fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
      // 在该作用域内只要设置CoroutineExceptionHandler都仅会向下传递
        supervisorScope {
            launch(CoroutineExceptionHandler { _, _ ->  }) {
                throw NullPointerException()
            }

            launch {
                delay(1000) // 即使上面的launch抛出异常也会继续执行这里
                println("( Process.kt:18 )    ")
            }
        }
    }

    println("( Process.kt:16 )    finish")
}

捕获异常

在作用域中的异常捕获和一般的异常捕获有所区别

  • CoroutineExceptionHandler可以捕获所有子作用域内异常
  • async可以使用监督作业可以捕获内部发生的异常, 但是其await要求trycatch
  • launch要求监督作业配合异常处理器同时使用, 缺一不可
  • withContext/supervisorScope/coroutineScope/select可以trycatch捕获异常

原始协程

函数回调字段描述
suspendCoroutineContinuationResult
suspendCancellableCoroutineCancellableContinuation可取消
suspendAtomicCancellableCoroutineCancellableContinuation可取消

[Continuation]

public val context: CoroutineContext
public fun resumeWith(result: Result<T>)

[CancellableContinuation] -| Continuation

public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?
public fun completeResume(token: Any)

public fun cancel(cause: Throwable? = null): Boolean

public fun invokeOnCancellation(handler: CompletionHandler)
public fun CoroutineDispatcher.resumeUndispatched(value: T)
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)

线程不安全

解决线程不安全问题

  1. 互斥锁
  2. 切换线程实现单线程
  3. Channel

互斥

相当于Java中的Lock替代品: Mutex

创建互斥对象

public fun Mutex(locked: Boolean = false): Mutex
// p: 设置初始状态, 是否立即上锁

使用扩展函数可以自动加锁和解锁

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner: 钥匙

函数

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
public fun holdsLock(owner: Any): Boolean
// owner是否被用于锁
public fun tryLock(owner: Any? = null): Boolean
// 使用owner来上锁, 如果owner已上锁则返回false

Channel

  1. 多个作用域可以通过一个Channel对象来进行数据的发送和接收
  2. Channel设计参考Go语言的chan设计, 可用于控制作用域的阻塞和继续(通过配合select)
  3. 在协程1.5开始出现废弃函数不在此处介绍

Channel属于接口无法直接创建, 我们需要通过函数Channel()来创建其实现类

源码

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
  when (capacity) {
    RENDEZVOUS -> RendezvousChannel() // 无缓存
    UNLIMITED -> LinkedListChannel() // 无限制
    CONFLATED -> ConflatedChannel()  // 合并
    BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
    else -> ArrayChannel(capacity) // 指定缓存大小
  }
  • capacity

    缓冲大小, 默认0
    当Channel发送一条数据时就会挂起通道(不继续执行发送后续代码), 只有在接收这条数据时才会解除挂起继续执行; 但是我们可以设置缓存大小
    

通道允许被遍历获取当前发送数据

val channel = Channel<Int>()

for (c in channel){

}
public suspend fun yield(): Unit

Channel

Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口, 所以既能发送又能接收数据

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 

SendChannel

public val isClosedForSend: Boolean
// 是否关闭
public fun close(cause: Throwable? = null): Boolean
// 关闭发送通道

public fun trySend(element: E): ChannelResult<Unit>
// 发送消息, 非suspend函数

public suspend fun send(element: E)
// 发送消息

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
// 当通道关闭时执行回调

public val onSend: SelectClause2<E, SendChannel<E>>
// 立即发送数据(如果允许), 在select中使用
  • 发送通道关闭后不能继续使用ReceiveChannel接收数据, 会导致ClosedReceiveChannelException抛出
  • 前缀try*等函数表示不是suspend挂起函数, 无需在协程作用域中调用

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已经关闭通道, 如果关闭通道以后还存在缓存则会接收完缓存之后返回false

public val isEmpty: Boolean // 通道是否为空

public suspend fun receive(): E
public fun tryReceive(): ChannelResult<E>
public suspend fun receiveCatching(): ChannelResult<E>
// 接受通道事件

public val onReceive: SelectClause1<E> // 如果通道关闭, 抛出异常
public val onReceiveCatching: SelectClause1<ChannelResult<E>>// 此函数不会抛出异常
// 在select中使用的监听器, 推荐使用第三个函数

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`对象可以判断通道是否已关闭

public fun cancel(cause: CancellationException? = null)
// 关闭通道

public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)
// 将Chan转成Flow
  1. 通道的发送和接收都会导致作用域被阻塞, 但是发送消息可以通过设置缓存让他不阻塞, 或者取消通道可以让阻塞继续
  2. 通道只允许在挂起函数中发送和接收, 但是创建通道不限制
  3. 关闭通道会导致receive抛出异常
  4. SendChannel执行close函数后不允许再发送或者接收数据, 否则抛出异常
  5. Channel的send | receive函数所在作用域被取消cancel不会导致通道结束(isClosedForReceive返回false)
  6. receive接收而不是遍历则会导致卡住作用域

consume

ReceiveChannel不仅可以通过迭代器来接收事件, 还可以使用consume系列函数来接收事件

本质上consume和迭代没有任何区别只是consume会在发生异常时自动取消通道(通过cancel函数)

源码

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
    var cause: Throwable? = null
    try {
        return block() // 直接返回
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        cancelConsumed(cause) // 如果发生异常取消通道
    }
}

consumeEach函数仅是迭代接收事件且异常自动取消; 一般建议使用consume函数来接收事件

BroadcastChannel

这个通道和一般的通道区别在于他的每个数据可以被每个作用域全部接收到; 默认的通道一个数据被接收后其他的协程是无法再接收到数据的

广播通道通过全局函数创建对象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>

本身广播通道继承自SendChannel, 只能发送数据, 通过函数可以拿到接收通道

public fun openSubscription(): ReceiveChannel<E>

取消通道

public fun cancel(cause: CancellationException? = null)

将Channel转成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>

通过扩展函数在协程作用域中快速创建一个广播发送通道

public fun <E> CoroutineScope.broadcast(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> 

迭代通道

接收通道实现操作符重载可以使用迭代

public operator fun iterator(): ChannelIterator<E>

示例

for (i in produce){
	// 收到每个发型的消息
}

当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的

Produce

上面介绍的属于创建Channel对象来发送和接收数据, 但是还可以通过扩展函数快速创建并返回一个具备发送数据的ReceiveChannel对象

public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
  • context: 可以通过协程上下文决定调度器等信息
  • capacity: 初始化通道空间

ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具备发送通道数据以及协程作用域作用

当produce作用域执行完成会关闭通道, 前面已经提及关闭通道无法继续接收数据

等待取消

该函数会在通道被取消时回调其函数参数, 前面提及协程取消时可以通过finally来释放内存等操作, 但是通道取消无法使用finally只能使用该函数

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 
// [SendChannel.close] or [ReceiveChannel.cancel] 代表取消通道

Actor

可以通过actor函数创建一个具备通道作用的协程作用域

public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
  • context: 协程上下文
  • capacity: 通道缓存空间
  • start: 协程启动模式
  • onCompletion: 完成回调
  • block: 回调函数中可以进行发送数据

该函数和produce函数相似,

  1. produce返回ReceiveChannel, 外部进行数据接收; actor返回的SendChannel, 外部进行数据发送
  2. actor的回调函数拥有属性channel:Channel, 既可以发送数据又可以接收数据, produce的属性channel属于SendChannel
  3. 无论是produce或者actor他们的通道都属于Channel, 既可以发送又可以接收数据, 只需要类型强转即可
  4. 本身Channel可以进行双向数据通信, 但是设计produce和actor属于设计思想中的生产者和消费者模式
  5. 他们都属于协程作用域和数据通道的结合

轮循器

无论是RxJava还是协程都支持轮循器的功能, 在我的网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能

这里的协程轮循器就比较简陋

public fun ticker(
    delayMillis: Long,
    initialDelayMillis: Long = delayMillis,
    context: CoroutineContext = EmptyCoroutineContext,
    mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit>

该通道返回的数据是Unit

默认情况下可以理解为通道会在指定间隔时间后一直发送Unit数据

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}

但是如果下游不是在发送数据以后立即接收数据, 而是延迟使用receive函数来接收通道数据

TickerMode该枚举拥有两个字段

  • FIXED_PERIOD 默认值, 动态调节通道发送数据的时间间隔, 时间间隔可以看做是上游发送数据的
  • FIXED_DELAY 只有当接收数据后才会开始计算间隔时间, 时间间隔可以看做是下游接收数据的

这个轮循器不支持多订阅|暂停|继续|重置|完成, 但是我的Net库中Interval对象已实现所有功能

Select

select函数回调中监听多个Deferred/Channel的结果, 且只会执行最快接收数据的通道或者结果回调.

动作

在前面的函数介绍中可以看到一系列on{动作}变量, 他们的值全部是SelectClause{数字}接口对象;

[SelectBuilder]

public interface SelectBuilder<in R> {
    public operator fun SelectClause0.invoke(block: suspend () -> R)
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
    @ExperimentalCoroutinesApi
    public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}

根据这定义的扩展函数就可以直接使用动作

对象使用的函数
SelectClause0onJoin
SelectClause1OnReceive
SelectClause2onSend

示例

@ObsoleteCoroutinesApi
@UseExperimental(InternalCoroutinesApi::class)
suspend fun selectMulti(a: Channel<Int>, b: Channel<Int>): String = select<String> {

    b.onReceive {
        "b $it" // 优先执行第一个, 不是函数原因, 而是顺序
    }

    b.onReceiveOrClosed {
        "b $it"
    }
    
    a.onSend(23) {
        "发送 23"
    }
}

fun main() = runBlocking<Unit> {
    val a = Channel<Int>(1) // 缓冲数量, 避免发送数据时阻塞
    val b = Channel<Int>(1)
    
    launch {
        b.send(24)
        val s = selectMulti(a, b)
        println("结果 = $s")
    }
}
  • onReceive 在关闭通道时会导致抛出异常, 如果不想抛出异常应当使用onReceiveOrClosed来替换
  • onSend 该函数等效于Channel.send, 就是发送一个值, 假设注册多个onSend肯定是第一个先回调返回结果
  • 即使已经有成员被选中(select)也不会导致其他的成员协程作用域结束

[ValueOrClosed]

public val isClosed: Boolean // 通道是否已关闭

public val value: T
public val valueOrNull: T?
// 两者都是获取通道内的值, 但是第2个如果通道关闭不会抛出异常而是返回NULL
  1. 当在select中一个通道同时存在发送和接收监听时, 如果两者都执行到(即select没有被打断都执行到)会导致异常抛出
  2. 如果通道重复监听(多个动作), 优先执行第一个
  3. 关闭通道同样会收到数据, onReceive抛出异常, onReceiveOrClose数据为null

Flow

Flow相似于RxJava同样分为三个部分:

  1. 上游
  2. 操作符
  3. 下游

下游接收事件要求在协程作用域内执行(suspend函数)

创建Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>

示例

fun shoot() = flow {
    for (i in 1.;3) {
        delay(1000) // 假装我们在这里做了一些有用的事情
        emit(i) // 发送下一个值
    }
}
  • 集合或者Sequence都可以通过asFlow函数转成Flow对象

  • 也可以像创建集合一样通过fowOf直接创建Flow对象

  • Channel通道转成Flow

    public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow
    
  • 甚至挂起函数也可以转成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    

collect和flow的回调函数本身属于suspend函数可以开启协程作用域

创建Flow的函数

函数描述
flow普通Flow
channelFlow创建通道, 其支持缓冲通道, 允许不同的CorotineContext发送事件
callbackFlow与channelFlow函数除了不使用awaitClose会报错以外没有区别
emptyFlow空的Flow
flowOf直接发送数据

flow的发射函数emit不是线程安全的不允许其他线程调用, 如果需要线程安全请使用channelFlow而不是flow

channelFlow使用send函数发送数据

发射数据示例

flow<Int> {
  emit(23)
}

channelFlow<Int> {
  send(23) // offer(23)
}
  1. offer可以在非suspend函数中使用, send必须在suspend函数中使用
  2. offer存在一个返回值, 假设没有元素空间则会直接返回false, send则会挂起阻塞等待新的元素空间.

Flow在取消作用域时释放资源可以使用callbackFlow. 这里演示注册和取消一个广播AppWidgetProvider

callbackFlow<Int> {
  val appWidgetProvider = AppWidgetProvider()
  registerReceiver(appWidgetProvider, IntentFilter()) // 注册
  awaitClose {  // 该回调会在协程作用域被取消时回调
    unregisterReceiver(appWidgetProvider) // 注销
  }
}.collect { 

}

收集

收集数据

Flow是冷数据, 要求调用函数collect收集数据时才会进行数据的发射; 该系列函数也成为末端操作符;

flow {
  emit(23)
}.collect {
	System.err.println("(Demo.kt:9)    it = $it")
}

查看源码会发现这个emit实际上就是执行collect的参数函数

collect函数表示接收上游发送的数据

public suspend fun Flow<*>.collect() 
// 不做任何处理的收集器, 仅仅为了触发发射数据

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit
// 收集

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
// 和上个函数的区别是: 如果在下游没有处理完情况下上游继续下个发射会导致上次的下游被取消

public suspend inline fun <T> Flow<T>.collectIndexed(
      crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
// 具备索引和值的收集器

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 将Flow运行在指定的协程作用域内

[FlowCollector] 发射器

public suspend fun emit(value: T)
// 发送一个数据

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// 发射另一个flow对象

调度器

调度器

Flow默认使用的是其所在的当前线程或者协程上下文, Flow不允许在内部使用withContext来切换调度器, 而是应该使用flowOn函数

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

该函数改变的是Flow函数内部发射时的线程, 而在collect收集数据时会自动切回创建Flow时的线程

缓存

不需要等待收集执行就立即执行发射数据, 只是数据暂时被缓存而已, 提高性能

默认切换调度器时会自动缓存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>

合并函数, 这个函数实际上就是buffer, 当下游无法及时处理上游的数据时会丢弃掉该数据

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

合并

将多个事件合并后发送给下游

zip

将两个Flow在回调函数中进行处理返回一个新的值 R

当两个flow的长度不等时只发送最短长度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

示例

val nums = (1.;3).asFlow().onEach { delay(300) } // 发射数字 1.;3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
    .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

combine

public fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R
): Flow<R>
// 组合两个流,在经过第一次发射以后,任意方有新数据来的时候就可以发射,另一方有可能是已经发射过的数据

public fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>

集合

Flow直接转成集合函数

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C

叠加

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`为上次回调函数返回值, 第一次为初始值, 等同于叠加效果; 该函数和reduce的区别就是支持初始值; reduce累计两次元素才会回调函数

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>

转换

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>

public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先发送所有的元素, 然后上游每个元素会导致回调函数中的Flow发送所有元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同于RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
// 串行收集数据

public fun <T> Flow<Flow<T>>.flattenMerge(
  concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T>
// 并发收集数据

public inline fun <T, R> Flow<T>.flatMapLatest(
  @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> 
// 在每次 emit 新的数据以后,会取消先前的 collect

public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
// 包含元素索引

生命周期

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 开始

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回调函数中的参数`cause`如果为null表示正常完成没有抛出异常, 反之则抛出异常非正常结束, 
// 和catch函数一样只能监听到上游发生的异常, 但是无法避免异常抛出只能在异常抛出之前执行回调函数

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
// 该函数只能捕获上游异常, 如果异常处于函数调用之下则依然会被抛出

过滤

限制流发送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定数量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丢弃指定数量事件
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回调函数判断是否丢弃或者接收, 只要丢弃或者接收后面就不会继续发送事件(结束流)

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public suspend fun <T> Flow<T>.single(): T
// 期待只有一个元素, 否则抛出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不抛出异常, 但如果不是仅有元素则返回null

public suspend fun <T> Flow<T>.first(): T
// 如果不存在一个元素则会抛出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回调函数判断为true的第一个条件符合的元素

重试

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE, // 重试次数
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>

public fun <T> Flow<T>.retryWhen(
  predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>

过滤

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 转换函数, 可以在回调函数中发送新的元素

public fun <T, R> Flow<T>.transformLatest(                                      
  @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

scan和reduce的区别在于

  • reduce是全部叠加计算完成后被收集
  • scan是每次叠加一次后收集一次数据

StateFlow/SharedFlow

类关系

SharedFlow

|- MutableSharedFlow

|- StateFlow

​ |- MutableStateFlow

SharedFlow属于热流数据, 既没有收集(collect)情况下也会发送, 然后在收集时进行重放(replay). 可以使用shareIn将冷流转成热流. 也可以直接使用以下函数创建

public fun <T> MutableSharedFlow(
    replay: Int = 0, // 重放数量
    extraBufferCapacity: Int = 0, // 缓存数量(不包含重放数量)
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

使用BufferOverflow

  1. DROP_LATEST 丢弃最新值
  2. DROP_OLDEST 丢失最旧值
  3. SUSPEND 挂起阻塞

StateFlow可以看做在Flow的基础上加上了LiveData的特性. 但是不存在生命周期跟随(除非使用lifecycleScope等生命周期作用域), 一直都可以收集数据

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

示例

将flow从冷流转换成热流使用函数shareIn

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
)

SharingStarted:

  1. WhileSubscribed 在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态
  2. Lazily 存在订阅者时,将使上游提供方保持活跃状态
  3. Eagerly 立即启动提供方

Android

Google发行的Jetpack库中很多组件都附有KTX扩展依赖, 这种依赖主要是增加kotlin和协程支持

Lifecycle

官方提供生命周期协程作用域的快速创建实现;

  • 指定生命周期运行协程
  • 自动在onDestory取消协程

引入ktx依赖库

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"

当执行到某个生命周期时运行协程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast(
    minState: Lifecycle.State,
    block: suspend CoroutineScope.() -> T
)

这些函数都属于LifecycleLifecycleOwner的扩展函数

LiveData

依赖

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"

提供开发者使用的只有这两个函数, 两个函数功能一样, 只是每个参数接收时间单位不一致

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeout: Duration,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
  • timeout: 如果liveData的没有处于活跃的观察者则在指定的时间内(单位毫秒)会取消其作用域[block]
  • block: 该作用域只在活跃状态才会触发, 默认在Dispatchers.Main.immediate调度器

liveData作用域具备发射数据和LiveData的作用

interface LiveDataScope<T> {
    /**
     * Set's the [LiveData]'s value to the given [value]; If you've called [emitSource] previously,
     * calling [emit] will remove that source.
     *
     * Note that this function suspends until the value is set on the [LiveData];
     *
     * @param value The new value for the [LiveData]
     *
     * @see emitSource
     */
    suspend fun emit(value: T)

    /**
     * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]; Calling this
     * method will remove any source that was yielded before via [emitSource];
     *
     * @param source The [LiveData] instance whose values will be dispatched from the current
     * [LiveData];
     *
     * @see emit
     * @see MediatorLiveData.addSource
     * @see MediatorLiveData.removeSource
     */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /**
     * References the current value of the [LiveData];
     *
     * If the block never `emit`ed a value, [latestValue] will be `null`; You can use this
     * value to check what was then latest value `emit`ed by your `block` before it got cancelled.
     *
     * Note that if the block called [emitSource], then `latestValue` will be last value
     * dispatched by the `source` [LiveData];
     */
    val latestValue: T?
}
  1. 如果emitSource在emit之前执行则无效
  2. 该作用域会在每次处于活跃状态时都执行一遍, 如果将应用从后台切换到前台则会返回执行该作用域, 但是观察者只会在活跃时才收到数据