初识Akka Stream

5,430 阅读4分钟

背景

响应式编程

响应式编程目前在编程领域都算式流行词汇了,作为Scala的创造公司Lightbend 公司(前身是 Typesafe)发起了响应式宣言(Reactive Manifesto)。Akka、Rx系列甚至Spring最新版本的WebFlux都加入到这场流行文化中。

响应式编程用一句话总结可能也无法马上理解,其实它本身也没有新的东西,主要式关注基于事件驱动的快速异步响应,当然为达到目的也得考虑如何容错(响应式四准则:灵敏性、伸缩性、容错性、事件驱动)。简单可以类比下我们常用MVC模式,Model的变化会通知View层的快速改变(事件通知),而无需View不停的去查询Model层的变化(轮询)。这就是一种通过事件机制快速作出响应,当然响应式模式不只简单的关注事件驱动和快速响应,也关注应用的对不同负载的伸缩扩展,还有在异步模式下的高容错性。

响应式编程准则

响应流

响应式编程中有个核心的问题要处理,就是响应流。Netflix、Pivotal、Typesafe等公司的工程师们在2013年共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议。其中描述了响应流的特点:

  • 具有处理无限数量的元素的能力
  • 按序处理
  • 异步地传递元素
  • 必须实现非阻塞的背压

Akka Stream就完全实现了“响应式流规范(Reactive Stream Specification)”。

任何东西都可以成为一个响应流,例如变量、用户输入、属性、缓存、数据结构等等。你可以创建、合并、过滤响应流,一个响应流可以作为另一个响应流的输入,甚至多个响应流也可以作为另一个响应流的输入。举个例子我们要从一个点击的响应流中处理双击或着多次点击事件:

点击响应流

背压

背压(backpressure)是为了解决响应流源和接收者处理速度不一致而采取的一种由处理者告诉上游进行速度调整的一种方式。在没有背压的情况下,响应流可能出现如下情况:

没有背压
那么使用背压有两种,一种是源发送数据过快,另一种是发送过忙。接收一方都会往上汇报以采取合适的发送速度。当然具体的处理有多种策略,这里不细讲,如下有个简单的例子:
背压
背压
实际上背压很早就出现了,以前背压通常通过阻塞生产者来实现,而等待消费者以自己的速度处理消息。这种依赖于系统间消息同步处理的方法是非常低效的,并且否定了异步处理的优势(更大的可扩展性和更好的资源利用率),因此需要一种用于实现背压的非阻塞解决方案。在响应流的背景下,背压是异步处理模型的一个组成部分,并通过异步消息传递来实现。

开始

我们开始写一些Akka Stream的相关代码来了解它。先建立一个sbt工程,在build中加入:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"

为了能够运行所有的Stream,我们先加入两个ActorSystemActorMaterializer的隐式变量:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object MyFirstStream {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    
  }
}

构建Stream基础构件

Akka Stream包含了三大基础构件Source、Sink、Flow。

Source

Source
Source即响应流的源头,源头具有一个数据出口,如上图比较形象的描述了Source。我们可以通过各种数据来创建一个Source:

val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")

Sink

Sink
Sink就是流的最终目的地,包含一个数据入口,我们可以如下来创建Sink:

val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore

Flow

Flow
Flow就是流的中间组件,包含一个数据入口和数据出口。我们可以这样来创建Flow:

val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 当buffer满了后进行背压

定义流

流可以通过基础组件构成的图和网络来表示,我们先从最简单的方式来定义,将一个Source和Sink连起来就可以形成一个流:

Source Sink

val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)

这里面的Keep.right说明我们只关心Sink最后得到的值。

我们可以在者中间加上Flow,形成一个稍微复制的流:

Source Flow Sink

val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)

执行流

我们现在可以使用run方法执行前面创建的流,结果会放到Future中。

val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12

我们可以使用更简单的方式来定义并执行流,不需要中间量:

// 使用指定的sink来执行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold所有元素的sink来执行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)