阅读 3229

最全面的Kotlin协程: Coroutine/Channel/Flow 以及实际应用

协程这个概念在1958年就开始出现, 目前某些语言开始原生支持(目前主流语言我感觉只有Java完全不支持协程). Java没有原生协程但是可以大型公司都自己或者使用第三方库来支持协程编程, 但是Kotlin原生支持协程.

Android领域的网络请求库一般由Rxjava实现, 包括我自己写的网络请求库同样也是采用的RxJava. 但是这些RxJava实现的网络请求库同样很难方便的实现并发

我认为协程的核心就是一个词: 作用域, 理解什么是作用域就理解协程了

什么是协程:

线程和协程的关系属于一对多关系, 一个线程上允许存在多个协程, 即主线程你也能异步执行代码. 但是让某个线程执行太多协程效率太低下, 所以针对不同的场景建议使用调度器切换线程, 使用协程开始就不需要考虑线程的问题, 只需要在不同场景使用不同的调度器(调度器会对特定任务进行优化)就好, 协程英文名是Coroutine.

特性

使用场景

假设首页存在七个接口网络请求(后端人员处理差)的情况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍.

目前计算机都是通过多核CPU提升计算能力, 所以熟练掌握并发编程是未来的趋势

协程优势

  1. 并发实现方便
  2. 没有回调嵌套发生, 代码结构清晰
  3. 创建协程性能开销优于创建线程, 一个线程可以运行多个协程, 单线程即可异步

实验特性

协程在Kotlin1.3时候放出正式版本, 但是目前仍然存在不稳定的函数变动, 不过这个我认为不影响项目中实际使用

@FlowPreview 代表可能以后存在Api函数变动

@ExperimentalCoroutinesApi  代表目前可能存在不稳定的因素的函数

@ObsoleteCoroutinesApi 可能存在被废弃的可能
复制代码

Kotlin的协程主要构成分为三部分

  1. CoroutineScope 协程作用域: 每个协程体都存在一个作用域, 异步还是同步由该作用域决定
  2. Channel 通道: 数据如同一个通道进行发送和接收, 可以在协程之间互相传递数据
  3. Flow 响应流: 类似RxJava等结构写法

为方便网络请求和简化异步作用域开启可以使用我实现的一个库: Net

1.0+版本为RxJava实现, 2.0+版本为Coroutine实现

本文章后续会根据Kotlin的版本中的协程迭代进行更新

常用事件分发框架为EventBus或者RxBus, 我之前使用RxJava的时候也写了RxBus来使用, 使用协程后我又用协程实现一个: Channel

展望

协程对于后端高并发优势很大, 相信Spring的Kt版本后续会跟进

至于Google的Jetpack基本上都有针对协程扩展

我们公司项目属于 MVVM+Kotlin+Coroutine+JetPack, 最明显的是并发网络请求速度翻倍. 同时代码更加结构清晰

创建协程

开启主协程的三种方式

生命周期和App一致, 无法取消(不存在Job), 不存在线程阻塞

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虚拟机退出
}
复制代码

这里说的是GlobalScope没有Job, 但是启动的launch都是拥有Job的. GlobalScope本身就是一个作用域, launch属于其子作用域.

不存在线程阻塞, 可以取消, 可以通过CoroutineContext控制协程生命周期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}
复制代码

线程阻塞, 适用于单元测试, 不需要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数可以在任意地方调用

一般我们在项目中是不会使用runBlocking, 因为阻塞主线程没有开启的任何意义

fun main() = runBlocking { 
    // 阻塞线程直到协程作用域内部所有协程执行完毕
}
复制代码

创建作用域

协程内部还s可以使用函数创建其他协程作用域, 分为两种创建函数:

  1. CoroutineScope的扩展函数, 只有在作用域内部才能创建其他的作用域
  2. suspend修饰的函数内部
  3. 协程永远会等待其内部作用域内所有协程都执行完毕后才会关闭协程

在主协程内还可以创建子协程作用域, 创建函数分为两种

  1. 阻塞作用域(串行): 会阻塞当前作用域

  2. 挂起作用域(并发): 不会阻塞当前作用域

同步作用域函数

都属于suspend函数

  • withContext: 可以切换调度器, 有返回结果
  • coroutineScope: 创建一个协程作用域, 该作用域会阻塞当前所在作用域并且等待其子协程执行完才会恢复, 有返回结果
  • supervisorScope: 使用SupervisorJob的coroutineScope, 异常不会取消父协程
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T
// 返回结果. 可以和当前协程的父协程存在交互关系, 主要作用为来回切换调度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke(
    noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函数而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
复制代码

异步作用域函数

这两个函数都不属于suspend, 只需要CoroutineScope就可以调用

  • launch: 异步并发, 没有返回结果
  • async: 异步并发, 有返回结果
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job


public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

复制代码

并发

同一个协程作用域中的异步任务遵守顺序原则开始执行. 适用于串行网络请求, 在一个异步任务需要上个异步任务的结果时.

协程挂起需要时间, 所以异步协程永远比同步代码执行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34)    后执行")
    }

    System.err.println("(Main.kt:37)    先执行")
}
复制代码

当在协程作用域中使用async函数时可以创建并发任务

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码

示例

fun main() = runBlocking<Unit> {
    val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35)    result = ${name.await() + title.await()}")
    delay(2000)
}
复制代码
  1. 返回对象Deferred. 通过函数await获取结果值.
  2. Deferred集合还可以使用awaitAll()等待全部完成.
  3. 不执行await任务也会等待执行完协程关闭
  4. 如果Deferred不执行await则async内部抛出的异常不会被logCat和tryCatch捕获, 但是依然会导致作用域取消和异常崩溃. 但当执行await时异常信息会重新抛出.

惰性并发

将async函数中的start设置为CoroutineStart.LAZY时则只有调用Deferred对象的await时才会开始执行异步任务(或者执行start函数).

启动模式

  1. DEFAULT 立即执行
  2. LAZY 直到Job执行start或者join才开始执行
  3. ATOMIC 在作用域开始执行之前无法取消
  4. UNDISPATCHED 不执行任何调度器, 直接在当前线程中执行, 但是会根据第一个挂起函数的调度器切换

异常

协程中发生异常, 则父协程取消并且父协程其他的子协程同样全部取消

Deferred

继承自Job

提供一个全局函数用于创建CompletableDeferred对象, 该对象可以实现自定义Deferred功能

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72)    结果 = ${deferred.await()}")
}
复制代码

创建CompletableDeferred的全局函数

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>

public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
复制代码

CompletableDeferred函数

public fun complete(value: T): Boolean
// 结果

public fun completeExceptionally(exception: Throwable): Boolean
// 抛出异常, 异常发生在`await()`时
复制代码

CoroutineScope

创建此对象表示创建一个协程作用域

结构化并发

如果你看协程的教程可能会经常看到这个词, 这就是作用域内部开启新的协程. 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发.

在一个协程作用域里面开启多个子协程进行并发行为

CoroutineContext

协程上下文, 我认为协程上下文可以看做包含协程基本信息的一个Context(上下文). 其可以决定协程的名称或者运行

创建一个新的调度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher

fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
复制代码

创建新的调度器比较消耗资源, 建议复用且当不需要的时候使用close函数释放

调度器

Dispatchers继承自CoroutineContext, 该枚举拥有三个实现. 表示不同的线程调度. 当函数不使用调度器时承接当前作用域的调度器

  1. Dispatchers.Unconfined 不指定线程,

    如果子协程切换线程那么接下来的代码也运行在该线程上
    复制代码
  2. Dispatchers.IO

    适用于IO读写
    复制代码
  3. Dispatchers.Main

    根据平台不同而有所差, Android上为主线程
    复制代码
  4. Dispatchers.Default 默认调度器

    在线程池中执行协程体, 适用于计算操作
    复制代码

立即执行

Dispatchers.Main.immediate
复制代码

immediate属于所有调度器都有的属性, 该属性代表着如果当前正处于该调度器中不执行调度器切换直接执行, 可以理解为在同一调度器内属于同步协程作用域

例如launch函数开启作用域会比后续代码执行顺序低, 但是使用该属性协程属于顺序执行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 执行顺序 1
}

// 执行顺序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 执行顺序 4
}

// 执行顺序 3
复制代码

协程命名

通过创建一个CoroutineName对象, 在构造函数中指定参数为协程名称, CoroutineName继承自CoroutineContext.

launch(CoroutineName("吴彦祖")){

}
复制代码

协程上下文名称用于方便调试使用

协程挂起

yield函数可以让当前协程暂时挂起执行其他协程体, 如果没有其他正在并发的协程体则继续执行当前协程体(相当于无效调用).

public suspend fun yield(): Unit
复制代码

看协程中可能经常提及挂起, 挂起可以理解为这段代码(作用域)暂停, 然后执行后续代码. 挂起函数一般表示suspend关键字修饰的函数, suspend要求只允许在suspend修饰的函数内部调用, 但是本身这个关键字是没做任何事的. 只是为了限制开发者随意调用.

挂起函数调用会在左侧行号列显示这个图标

image-20200106120117080

JOB

在协程中Job通常被称为作业, 表示一个协程工作任务, 他同样继承自CoroutineContext

val job = launch {

}
复制代码

Job属于接口

interface Job : CoroutineContext.Element
复制代码

函数

public suspend fun join()
// 等待协程执行完毕都阻塞当前线程

public fun cancel(cause: CancellationException? = null)
// 取消协程

public suspend fun Job.cancelAndJoin()
// 阻塞并且在协程结束以后取消协程

public fun start(): Boolean

public val children: Sequence<Job>
// 全部子作业

public fun invokeOnCompletion(
  onCancelling: Boolean = false, // true则取消作用域不会执行
  invokeImmediately: Boolean = true, // 
  handler: CompletionHandler): DisposableHandle

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 当其作用域完成以后执行, 主协程指定才有效, 直接给CoroutineScope指定时无效的
// 手动抛出CancellationException同样会赋值给cause

public fun getCancellationException(): CancellationException

public val onJoin: SelectClause0
public val children: Sequence<Job>
// 后面提及的选择器中使用
复制代码

状态

通过字段可以获取JOB当前处于状态

public val isActive: Boolean

public val isCancelled: Boolean

public val isCompleted: Boolean
复制代码

扩展函数

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()
复制代码

每个协程作用域都存在coroutineContext. 而协程上下文中都存在Job对象

coroutineContext[Job]
复制代码

结束协程

如果协程作用域内存在计算任务(一直打日志也算)则无法被取消, 如果使用delay函数则可以被取消.

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 这行代码存在则可以成功取消协程, 不存在则无法取消
      System.err.println("(Main.kt:30)    ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42)    结束")
}
复制代码

通过使用协程内部isActive属性来判断是否应该结束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦协程被取消则为false
            System.err.println("(Main.kt:30)    ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42)    结束")
}
复制代码

释放资源

协程存在被手动取消的情况, 但是有些资源需要在协程取消的时候释放资源, 这个操作可以在finally中执行.

无论如何finally都会被执行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31)    it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的协程无法继续挂起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42)    ")
}
复制代码

再次开启协程

通过withContextNonCancellable可以在已被取消的协程中继续挂起协程. 这种用法其实可以看做创建一个无法取消的任务

withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
复制代码

上下文组合

协程作用域可以接收多个CoroutineContext作为上下文参数. CoroutineContext本身属于接口, 很多上下文相关的类都实现与他.

配置多个CoroutineContext可以通过+符号同时指定多个协程上下文, 每个实现对象可能包含一部分信息可以存在覆盖行为故相加时的顺序存在覆盖行为.

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51)    ${Thread.currentThread()}")
}
复制代码
launch(Dispatchers.IO + CoroutineName("吴彦祖")){}
复制代码

协程局部变量

使用ThreadLocal可以获取线程的局部变量, 但是要求使用扩展函数asContextElement转为协程上下文作为参数传入在创建协程的时候.

该局部变量作用于持有该协程上下文的协程作用域内

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
复制代码

超时

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超过指定时间timeMillis自动结束协程. 
// 当没有超时时返回值获取并且继续执行协程. 
// 当超时会抛出异常TimeoutCancellationException, 但是不会导致程序结束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 如果超时不会结束协程而是返回null
复制代码

无法手动抛出TimeoutCancellationException, 因为其构造函数私有

全局协程作用域

全局协程作用域属于单例对象, 整个JVM虚拟机只有一份实例对象. 他的寿命周期也跟随JVM. 使用全局协程作用域的时候注意避免内存泄漏

public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
复制代码

全局协程作用域不继承父协程作用域的上下文, 所以也不会因为父协程被取消而自身被取消.

启动模式

  • DEFAULT 立即执行协程体
  • ATOMIC 立即执行协程体,但在开始执行协程之前无法取消协程
  • UNDISPATCHED 立即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程
  • LAZY 手动执行startjoin才会执行协程

协程取消

协程体如果已经执行实际上属于不可取消的, 在协程体中通过isActive来判断协程是否处于活跃中

通过取消函数的参数指定异常CancellationException可以自定义异常对象

不可取消的协程作用域

NonCancellable该单例对象用于withContext函数创建一个无法被取消的协程作用域

withContext(NonCancellable) {
  delay(2000)
}
复制代码

示例

fun main() = runBlocking {
    launch {
        delay(1000)
        System.err.println("(Main.kt:19)    ")
    }

    launch {
        withContext(NonCancellable) {
            delay(2000)
            System.err.println("(Main.kt:26)    ")
        }
    }

    delay(500) // 防止launch还未开启withContext就被取消
    cancel()
}
复制代码

GlobalScope取消

GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 所以无法取消协程. 但是可以通过给GlobalScope开启的协程作用域指定Job然后就可以使用Job取消协程.

协程异常

通过CoroutineExceptionHandler函数可以创建一个同名的对象, 该接口继承自CoroutineContext. 同样通过制定上下文参数传递给全局协程作用域使用, 当作用域抛出异常时会被该对象的回调函数接收到, 并且不会抛出异常.

  1. CoroutineExceptionHandler只有给最外层的父协程传参才有效
  2. CoroutineExceptionHandler异常处理器并不能阻止协程取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已
  3. 只要发生异常就会导致父协程和其所有子协程都被取消, 这种属于双向的异常取消机制, 后面提到的监督作业属于发生异常只会取消所有子协程, 属于单向向下传递.
  4. 对于async/withContext, CoroutineExceptionHandler都是无效的, 无法捕捉到异常, 他们应该使用try/catch.

不要尝试使用try/catch捕捉launch作用域的异常, 捕捉不到依然崩溃

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
复制代码

协程取消异常

取消协程的作业(Job)会引发异常, 但是会被默认的异常处理器给忽略, 但是我们可以通过捕捉可以看到异常信息

fun main() = runBlocking<Unit> {
    val job = GlobalScope.launch {

        try {
            delay(1000)
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    job.cancel(CancellationException("自定义一个用于取消协程的异常"))

    delay(2000)
}
复制代码

Job取消函数

public fun cancel(cause: CancellationException? = null)
复制代码
  • cause: 参数不传默认为JobCancellationException

全局协程作用域的异常处理

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
                                                  System.err.println("(Main.kt:41):main   coroutineContext = $coroutineContext, throwable = $throwable")
                                                 }

GlobalScope.launch(exceptionHandler) { 

}
复制代码

子协程无法设置异常处理器, 即使设置了也会被父协程覆盖而没有意义. 除非使用异常处理器+Job对象, 但是第一个子协程launch允许设置

异常聚合和解包

全局协程作用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 这里的异常是第一个被抛出的异常对象
        println("捕捉的异常: $exception 和被嵌套的异常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 当父协程被取消时其所有子协程都被取消, finally被取消之前或者完成任务之后一定会执行
                throw ArithmeticException() // 再次抛出异常, 异常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 这里抛出异常将导致父协程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope作用域没有执行完毕JVM虚拟机就退出
}
复制代码

监督作业

一般情况子协程发生异常会导致父协程被取消, 同时父协程发生异常会取消所有的子协程. 但是有时候子协程发生异常我们并不希望父协程也被取消, 而是仅仅所有子协程取消(仅向下传递异常), 这个使用就是用SupervisorJob作业.

创建监督作业对象

fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
        launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ ->  }) {
            throw NullPointerException()
        }

        delay(500)
        println("( Process.kt:13 )    ")
    }

    println("( Process.kt:16 )    finish")
}
复制代码
  1. 必须添加CoroutineExceptionHandler处理异常, 否则异常依然会向上传递取消父协程
  2. 直接创建 SupervisorJob() 对象传入作用域中会导致该作用域和父协程生命周期不统一的问题, 即父协程取消以后该子协程依然处于活跃状态, 故需要指定参数为coroutineContext[Job]即传入父协程的作业对象.

监督作业在withContextasync中添加无效

直接创建一个向下传递的作用域

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R
复制代码
  • 该函数属于阻塞
  • supervisorScope函数使用的依然是当前作用域的Job, 所以跟随当前作用域生命周期, 可以被取消.
  • 具备返回值
fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
      // 在该作用域内只要设置CoroutineExceptionHandler都仅会向下传递
        supervisorScope {
            launch(CoroutineExceptionHandler { _, _ ->  }) {
                throw NullPointerException()
            }

            launch {
                delay(1000) // 即使上面的launch抛出异常也会继续执行这里
                println("( Process.kt:18 )    ")
            }
        }
    }

    println("( Process.kt:16 )    finish")
}
复制代码

线程不安全

解决线程不安全问题

  1. 单协程上下文操作
  2. 互斥锁
  3. 切换线程实现单线程
  4. Channel

互斥

相当于Java中的Lock替代品: Mutex

创建互斥对象

public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否立即上锁
复制代码

使用扩展函数可以自动加锁和解锁

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 钥匙
复制代码

Channel

通过在不同的协程内部使用Channel实例可以数据通讯. 通道可以连续顺序传输N个元素

早期Channel计划实现类似RxStream的框架, 但是后面被Flow替代. Channel和Flow的关系类似于一个是线程安全(ChannelFlow)一个是线程不安全的(Flow).

多个协程允许发送和接收同一个通道数据

Channel属于接口无法直接创建, 我们需要通过函数Channel()来创建其实现类

源码

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel() // 无缓存
        UNLIMITED -> LinkedListChannel() // 无限制
        CONFLATED -> ConflatedChannel()  // 合并
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
        else -> ArrayChannel(capacity) // 指定缓存大小
    }
复制代码
  • capacity

    缓冲大小, 默认0
    当Channel发送一条数据时就会挂起通道(不继续执行发送后续代码), 只有在接收这条数据时才会解除挂起继续执行. 但是我们可以设置缓存大小
    复制代码

通道允许被遍历获取当前发送数据

val channel = Channel<Int>()

for (c in channel){

}
复制代码
public suspend fun yield(): Unit
复制代码

Channel

Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口, 所以既能发送又能接收数据.

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
复制代码

SendChannel

public val isClosedForSend: Boolean

public suspend fun send(element: E)
// 发送消息

public fun offer(element: E): Boolean

public fun close(cause: Throwable? = null): Boolean
// 关闭发送通道

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

public val onSend: SelectClause2<E, SendChannel<E>>
复制代码
  • 发送通道关闭后不能继续使用ReceiveChannel接收数据, 会导致ClosedReceiveChannelException抛出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已经关闭通道, 如果关闭通道以后还存在缓存则会接收完缓存之后返回false

public val isEmpty: Boolean

public suspend fun receive(): E
// 接受当前被发送的消息

public val onReceive: SelectClause1<E>
// 监听事件发送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 会抛出异常, 不推荐使用

public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已关闭

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`对象可以判断通道是否已关闭

public fun poll(): E?

public fun cancel(cause: CancellationException? = null)
// 取消
复制代码
  1. 通道的发送和接收都会导致作用域被阻塞, 但是发送消息可以通过设置缓存让他不阻塞, 或者取消通道可以让阻塞继续
  2. 通道只允许在挂起函数中发送和接收, 但是创建通道不限制
  3. 如果不关闭通道, 其所在作用域不会被结束
  4. 通道执行close函数后不允许再发送或者接收数据, 否则抛出异常
  5. 通道的send|receive函数所在作用域被取消cancel不会导致通道结束(isClosedForReceive返回false)

consume

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R
复制代码

BroadcastChannel

这个通道和一般的通道区别在于他的每个数据可以被每个作用域全部接收到. 默认的通道一个数据被接收后其他的协程是无法再接收到数据的

广播通道通过全局函数创建对象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
复制代码

本身广播通道继承自SendChannel, 只能发送数据, 通过函数可以拿到接收通道

public fun openSubscription(): ReceiveChannel<E>
复制代码

取消通道

public fun cancel(cause: CancellationException? = null)
复制代码

将Channel转成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
复制代码

通过扩展函数在协程作用域中快速创建一个广播发送通道

public fun <E> CoroutineScope.broadcast(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> 
复制代码

迭代通道

接收通道实现操作符重载可以使用迭代

public operator fun iterator(): ChannelIterator<E>
复制代码

示例

for (i in produce){
	// 收到每个发型的消息
}
复制代码

当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的

Produce

上面介绍的属于创建Channel对象来发送和接收数据, 但是还可以通过扩展函数快速创建并返回一个具备发送数据的ReceiveChannel对象

public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
复制代码
  • context: 可以通过协程上下文决定调度器等信息
  • capacity: 初始化通道空间

ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具备发送通道数据以及协程作用域作用.

当produce作用域执行完成会关闭通道, 前面已经提及关闭通道无法继续接收数据

等待取消

该函数会在通道被取消时回调其函数参数. 前面提及协程取消时可以通过finally来释放内存等操作, 但是通道取消无法使用finally只能使用该函数.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 

// [SendChannel.close] or [ReceiveChannel.cancel] 代表取消通道
复制代码

Actor

可以通过actor函数创建一个具备渠道作用的协程作用域

public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
复制代码
  • context: 协程上下文
  • capacity: 通道缓存空间
  • start: 协程启动模式
  • onCompletion: 完成回调
  • block: 回调函数中可以进行发送数据

该函数和produce函数相似,

  1. produce返回ReceiveChannel, 外部协程只能进行数据接收
  2. actor返回的SendChannel, 外部协程只能进行数据发送
  3. actor的回调函数拥有属性channel:Channel, 既可以发送数据又可以接收数据, produce的属性channel属于SendChannel
  4. 无论是produce或者actor他们的通道都属于Channel, 既可以发送又可以接收数据, 只需要类型强转即可.
  5. 本身Channel可以进行双向数据通信, 但是设计produce和actor属于设计思想中的生产者和消费者模式
  6. 他们都属于协程作用域和数据通道的结合

轮循器

无论是RxJava还是协程都支持轮循器的功能, 在我的网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能

这里的协程轮循器就比较简陋

public fun ticker(
    delayMillis: Long,
    initialDelayMillis: Long = delayMillis,
    context: CoroutineContext = EmptyCoroutineContext,
    mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit>
复制代码

该通道返回的数据是Unit

默认情况下可以理解为通道会在指定间隔时间后一直发送Unit数据

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}
复制代码

但是如果下游不是在发送数据以后立即接收数据, 而是延迟使用receive函数来接收通道数据

TickerMode该枚举拥有两个字段

  • FIXED_PERIOD 默认值, 动态调节通道发送数据的时间间隔, 时间间隔可以看做是上游发送数据的
  • FIXED_DELAY 只有当接收数据后才会开始计算间隔时间, 时间间隔可以看做是下游接收数据的

这个轮循器不支持多订阅|暂停|继续|重置|完成, 但是我的Net库中Interval对象已实现该功能.

Select

select闭包中可以创建多个协程作用域或者通道, 且只会执行最快接收数据的通道或者结果.

通道

@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
    a.onReceiveOrNull {
        if (it == null) "Channel 'a' is closed"
        else "a -> '$it.valueOrNull'"
    }

    b.onReceiveOrNull {
        if (it == null) "Channel 'b' is closed"
        else "b -> '$it.valueOrNull'"
    }
}

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {

    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }

    val b = produce<String> {
        repeat(4) { send("World $it") }
    }

    repeat(8) {
        // 打印最早的八个结果
        println(selectAorB(a, b))
    }

    coroutineContext.cancelChildren()
}
复制代码

Flow

Flow不属于挂起函数可以在任意位置创建. 默认执行在当前协程上下文中.

Flow和RxJava很相似, 分为上游和下游及中间操作符, 但是Flow内部属于协程作用域, 其调度器依靠挂起函数切换异步.

JetBrains公司也承认Flow属于参考Reactive Stream等框架的产物, 这样我相信很多人就能理解Flow的存在意义了. 这种上下游观察者模式在ROOM官方数据库中同样支持.

Flow的操作符不如RxJava丰富, 但是Flow的开发时间还很短还未正式完成. 后面可以跟进

创建Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
复制代码

示例

fun shoot() = flow {
    for (i in 1..3) {
        delay(1000) // 假装我们在这里做了一些有用的事情
        emit(i) // 发送下一个值
    }
}
复制代码
  • 集合或者Sequence都可以通过asFlow函数转成Flow对象

  • 也可以像创建集合一样通过fowOf直接创建Flow对象

  • Channel通道转成Flow

    public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow
    复制代码
  • 甚至挂起函数也可以转成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    复制代码

Flow和协程

collect和flow的回调函数本身属于suspend函数可以开启协程作用域

Flow发射函数emit不是线程安全的不允许其他线程调用, 如果需要线程安全请使用channelFlow而不是flow

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
复制代码

和RxJava一样理解基本用法和思想之后就仅需要熟悉不同的操作符了. 基本的操作符每个ReactiveStream框架都是雷同.

下面介绍Flow的函数(操作符)

Collect

收集数据

Flow是冷数据, 要求调用函数collect收集数据时才会进行数据的发射. 该系列函数也成为末端操作符.

flow {
  emit(23)
}.collect {
	System.err.println("(Demo.kt:9)    it = $it")
}
复制代码

查看源码会发现这个emit实际上就是执行collect的参数函数...

collect函数表示接收上游发送的数据

public suspend fun Flow<*>.collect() 
// 不做任何处理的收集器, 仅仅为了触发发射数据

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit
// 收集

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
// 和上个函数的区别是: 如果在下游没有处理完情况下上游继续下个发射会导致上次的下游被取消

public suspend inline fun <T> Flow<T>.collectIndexed(
      crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
// 具备索引和值的收集器

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 将Flow运行在指定的协程作用域内
复制代码

FlowCollector: 发射器

public suspend fun emit(value: T)
// 发送一个数据

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// 发射另一个flow对象
复制代码

Context

调度器

Flow默认使用的是其所在的当前线程或者协程上下文, Flow不允许在内部使用withContext来切换调度器, 而是应该使用flowOn函数

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
复制代码

该函数改变的是Flow函数内部发射时的线程, 而在collect收集数据时会自动切回创建Flow时的线程.

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>

public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>
复制代码

缓存

不需要等待收集执行就立即执行发射数据. 只是数据暂时被缓存而已. 提高性能.

默认切换调度器时会自动缓存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
复制代码

合并函数, 这个函数实际上就是buffer

当下游无法及时处理上游的数据时会丢弃掉该数据

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
复制代码

Zip

将多个事件合并后发送给下游

zip

将两个Flow在回调函数中进行处理返回一个新的值 R

当两个flow的长度不等时只发送最短长度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
复制代码

示例

val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
    .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
复制代码

combine

public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, 
                                        transform: suspend (a: T1, b: T2) -> R): Flow<R>

public fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
复制代码

Collection

Flow直接转成集合函数

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
复制代码

Reduce

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`为上次回调函数返回值, 第一次为初始值, 等同于叠加效果. 该函数和reduce的区别就是支持初始值. reduce累计两次元素才会回调函数

public suspend fun <T> Flow<T>.single(): T
// 期待只有一个元素, 否则抛出`IllegalStateException`

public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不抛出异常, 但如果不是仅有元素则返回null

public suspend fun <T> Flow<T>.first(): T
// 如果不存在一个元素则会抛出`NoSuchElementException`

public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回调函数判断为true的第一个条件符合的元素
复制代码

Merge

public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先发送所有的元素, 然后上游每个元素会导致回调函数中的Flow发送所有元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同于RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>

public fun <T> Flow<Flow<T>>.flattenMerge(
  concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T>

public fun <T, R> Flow<T>.transformLatest(                                      
  @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

public inline fun <T, R> Flow<T>.flatMapLatest(
  @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> 

public inline fun <T, R> Flow<T>.flatMapLatest(
  @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
复制代码

Emitter

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 转换函数, 可以在回调函数中发送新的元素

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 开始

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回调函数中的参数`cause`如果为null表示正常完成没有抛出异常, 反之则抛出异常非正常结束
// 和catch函数一样只能监听到上游发生的异常, 但是无法避免异常抛出只能在异常抛出之前执行回调函数
复制代码

Limit

限制流发送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定数量事件

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丢弃指定数量事件

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回调函数判断是否丢弃或者接收, 只要丢弃或者接收后面就不会继续发送事件(结束流)
复制代码

Error

捕捉异常

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
复制代码

该函数只能捕获上游异常, 如果异常处于函数调用之下则依然会被抛出

重试

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE, // 重试次数
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>

public fun <T> Flow<T>.retryWhen(
  predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
复制代码

Transform

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>

public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>


public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
复制代码

Android

Google发行的Jetpack库中很多组件都附有KTX扩展依赖, 这种依赖主要是增加kotlin和协程支持

Lifecycle

官方提供生命周期协程作用域的快速创建实现.

  • 指定生命周期运行协程
  • 自动在onDestory取消协程

引入ktx依赖库

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
复制代码

当执行到某个生命周期时运行协程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast(
    minState: Lifecycle.State,
    block: suspend CoroutineScope.() -> T
)
复制代码

这些函数都属于LifecycleLifecycleOwner的扩展函数

LiveData

依赖

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
复制代码

提供开发者使用的只有这两个函数, 两个函数功能一样, 只是每个参数接收时间单位不一致

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeout: Duration,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
复制代码
  • timeout: 如果liveData的没有处于活跃的观察者则在指定的时间内(单位毫秒)会取消其作用域[block]
  • block: 该作用域只在活跃状态才会触发, 默认在Dispatchers.Main.immediate调度器

liveData作用域具备发射数据和LiveData的作用

interface LiveDataScope<T> {
    /**
     * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously,
     * calling [emit] will remove that source.
     *
     * Note that this function suspends until the value is set on the [LiveData].
     *
     * @param value The new value for the [LiveData]
     *
     * @see emitSource
     */
    suspend fun emit(value: T)

    /**
     * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this
     * method will remove any source that was yielded before via [emitSource].
     *
     * @param source The [LiveData] instance whose values will be dispatched from the current
     * [LiveData].
     *
     * @see emit
     * @see MediatorLiveData.addSource
     * @see MediatorLiveData.removeSource
     */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /**
     * References the current value of the [LiveData].
     *
     * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this
     * value to check what was then latest value `emit`ed by your `block` before it got cancelled.
     *
     * Note that if the block called [emitSource], then `latestValue` will be last value
     * dispatched by the `source` [LiveData].
     */
    val latestValue: T?
}
复制代码
  1. 如果emitSource在emit之前执行则无效
  2. 该作用域会在每次处于活跃状态时都执行一遍, 如果将应用从后台切换到前台则会返回执行该作用域, 但是观察者只会在活跃时才收到数据
关注下面的标签,发现更多相似文章
评论