阅读 270

CPS 与 Kotlin coroutine

Continuation Passing Style

在异步编程中,由于无法拿到实时的结果,我们往往会通过设置回调的方式来处理执行的结果。

fun doSomethingAsync(param1: Int, param2: Any, callback: (Any?) -> Unit) {
    // ...
    // when execution is done
    callback.invoke(result)
}
复制代码

假设我们约定一种编程规范,所有的函数都按照上述的方式来定义,即所有的函数都直接返回结果值,而是在参数列表最后的位置传入一个 callback 函数参数,并在函数执行完成时通过这个 callback 来处理结果,这种代码风格被称为延续传递风格(Continuation Passing Style)。这里的回调函数(callback)被称为是延续(continuation),即这个回调函数决定了程序接下来的行为,整个程序的逻辑就是通过一个个的延续而拼接在一起。

In functional programming, continuation-passing style (CPS) is a style of programming in which control is passed explicitly in the form of a continuation. (from Wikipedia.)

CPS 的优点

我们在实现异步逻辑的时候会自然而然的采用类似 CPS 的方式,这是因为我们不知道什么时候可以处理方法的结果,所以把控制逻辑传给了要调用的方法,让该方法自己在执行完成后去主动调用。我们原本必须遵守顺序执行的控制逻辑,但是 CPS 给了我们一个机会可以去自定义控制逻辑。那么自定义控制逻辑可以做哪些事情呢?让我们来看一个例子。

构建单线程事件循环模型

我们来增加一点规则:每次调用函数并传入 callback 后,先将 callback 转换成 EventCallback。EventCallback 会将 callback 放入一个单线程线程池中去执行,示例代码如下所示。

val singleThreadExecutor = Executors.newSingleThreadExecutor()

fun ((Any?) -> Unit).toEventCallback(): ((Any?) -> Unit) {
    return fun(result: Any?) {
        singleThreadExecutor.submit {
            this.invoke(result)
        }
    }
}

fun doSomething(param1: Any, callback: (Any?) -> Unit) {
    var result: Any? = null
    // ...
    // when execution is done
    callback.toEventCallback().invoke(result)
}
复制代码

对于一些需要耗时等待的操作(例如 IO 操作),我们可以定义一些特殊的函数,在这些函数里具体逻辑被放到一个特定的线程池中去执行,待操作完成后再返回事件线程,这样可以保证我们的事件线程不被阻塞。

val IOThreadPool = Executors.newCachedThreadPool()

fun doSomethingWithIO(param1: Any, callback: (Any?) -> Unit) {
    IOThreadPool.submit {
        var result: Any? = null
        // ...
        // when io operation is done
        callback.toEventCallback().invoke(result)
    }
}
复制代码

这样我们实际建立了一个与 Node.js 类似的单事件循环+异步IO的执行模型,可以看到通过使用 CPS 的方式我们可以更灵活的处理返回值,例如选择恰当的时机或者是做拦截操作。

CPS 的缺点

Callback Hell

在普通的执行模型中,如果我们需要多个前提值来计算一个最终结果,那么我们只需要顺序计算每个值,然后在计算结果,每个前提值的计算过程都是平级的,但是在 CPS 中,执行顺序是通过回调传递的,所以我们不得不每个值的计算作为一个回调嵌套到另一个值的计算过程中,这就是所谓的 Callback Hell,这样的代码会导致难以阅读。

// Normal
val a = calculateA()
val b = calculateB()
val c = calculateC()
// ...
val result = calculateResult(a, b, c/*, ...*/)

// CPS
fun calculateResult(callback: (Any?) -> Unit) {
    calculateA(fun(a: Any?) {
        calculateB(fun(b: Any?) {
            calculateC(fun(c: Any?) {
                //...
                callback.invoke(calculate(a, b, c/*, ...*/)
            }
        }
    }
}
复制代码

栈空间占用问题

在类似 C 和 Java 这样的语言里,每次函数调用会为该函数分配对应的栈空间,用来存放函数参数,返回值和局部变量的信息,然后在函数返回之后再释放这部分空间。而在 CPS 模型中,我们可以看到,回调是在函数执行完成前被调用的,所以在进入回调函数之后外面的函数的栈空间并不会被释放,这样程序很容易出现栈空间溢出的问题。

CPS 中的回调其实具有一些特殊性,即总是作为函数执行的最后一个步骤(代替普通流程中的返回值),所以这个时候外层函数的值并不会再被访问,这种情况其实是尾递归调用的一种表现。在绝大多数的函数式语言中,系统都会对尾递归进行优化,回收外层函数的栈空间。但是在 C 和 Java 中并没有这样的优化。

Kotlin coroutine

Kotlin coroutine 本质上就是利用 CPS 来实现对过程的控制,并解决了一些用 CPS 时会产生的问题。

suspend 关键字

Kotlin 中 suspend 函数的写法与普通函数基本一致,但是编译器会对标有 suspend 关键字的函数做 CPS 变换,这解决了我们提到的 callback hell 的问题:我们依然可以按照普通的顺序执行流程来写代码,而编译器会自动将其变为 CPS 的等价形式。

另外,为了避免栈空间过大的问题,kotlin 编译器实际上并没有把代码转换成函数回调的形式,而是利用了状态机模型。Kotlin 把每次调用 suspend 函数的地方称为一个 suspension point,在做编译期 CPS 变换的时候,每两个 suspension point 之间可以视为一个状态,每次进入状态机的时候会有一个当前的状态,然后会执行该状态对应的代码,如果这时程序执行完毕,则返回结果值,否则返回一个特殊的标记,表示从这个状态退出并等待下次进入。这样相当于实现了一个可复用的回调,每次都使用这个回调然后根据状态的不同执行不同的代码。

流程控制

同我们上面控制回调执行的例子一样,kotlin 也可以对 suspend 函数进行控制,实现的方式是通过 CoroutineContext 类。在每个 suspend 函数执行的地方都会有一个对应的 CoroutineContext,CoroutineContext 是一个类似单向链表的结构,系统回去遍历这个链表并根据每个元素对 suspend 函数执行的流程进行控制,例如我们可以通过 Dispatcher 类控制函数执行的线程,或者通过 Job 类来 cancel 当前的函数执行。我们可以使用 coroutine 来重写一下我们上面定义的模型:

class SingleLoopEnv: CoroutineScope {
		
override val coroutineContext: CoroutineContext = 
        Executors.newSingleThreadExecutor().asCoroutineDispatcher()

    suspend fun doSomething(param1: Any?): Any? {
        var result: Any? = null
        // ...
        // when execution is done
        return result
    }

    fun doSomethingWithIO(param1: Any?): Deferred<Any?> = 
            GlobalScope(Dispatchers.IO).async {
        var result: Any? = null
        // ...
        // when io operation is done
        return result
    }

    fun main() = launch {
        val result = doSomething(null)
        // handle result
        // ...

        val ioResult = doSomethingWithIO(null).await()
        // handle io result
        // ...
    }
}
复制代码

总结

像 Kotlin 提供一些其它机制一样,coroutine 其实也是一种语法糖,但是这是一种比较高级的语法糖,它改变了我们代码的执行逻辑,使得我们可以更好的利用 CPS 这一函数式编程的思想,去解决复杂的异步编程问题。

Article by Orab

关注下面的标签,发现更多相似文章
评论