Spark Structured Streaming系列(一)之初体验

1,436 阅读5分钟

概述

Structured Streaming是一个基于Spark SQL的分布式流处理引擎,它有着以下几个特点:

  • 直接可以使用DF/DS的API来写流处理逻辑,比如聚合(streaming aggregations)、事件窗口(event-time windows)、流到批处理连接(stream-to-batch joins)等
  • 以checkpoint和WAL(预写日志)方式保证端对端精确消费一次语义
  • 端对端低延迟(100毫秒)处理
  • 高扩展性、高容错

核心概念

Structured Streaming将持续流动的数据看作一张无限进行插入的表,当有查询操作时,它会根据具体代码生成"结果表",在间隔时间内每有一条数据进来,"结果表"都会被更新。当然Structured Streaming不会真正的将整张表实现,它其实只是读取最新的数据进行处理更新结果之后抛弃原数据,只维护最小的中间状态数据用来更新结果,关于这个中间状态后来文章会详细介绍的。
下面是官网提供的概念图,便于大家理解:

structured-streaming-stream-as-a-table.png

structured-streaming-model.png

代码演示

上述描述这么多,不如直接上代码,这次的实例是经典的WordCount,使用语言是Scala,版本为2.11.8。如果有小伙伴之前没有接触过Scala的,可以看看我的Scala系列。
首先是修改pom.xml,增添下述依赖:

<properties>
  <scala.version>2.11.8</scala.version>
  <scala.binary.version>2.11</scala.binary.version>
</properties>    

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_${scala.binary.version}</artifactId>
  <version>2.4.4</version>
</dependency>

接着创建SparkSession,以此为入口,侧面也说明Structured Streaming基于Spark SQL引擎,你可以用DS/DF来做流处理。

val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[2]")
      .getOrCreate()

最后就是我们的熟悉的三部曲,source->transform->sink,我们从socket中读取数据进行wc操作之后输出到console。

import spark.implicits._
val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", "9999")
      .load()

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

query.awaitTermination()

由于开发环境是mac,直接nc -lk 9999,输入一系列测试数据,控制台就会输出结果,结果如下。如果不是mac的小伙伴,云主机也是可以的哦,只需要修改一下host就行啦!

$ nc -lk 9999
hadoop hadoop flink
spark spark
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| flink|    1|
|hadoop|    2|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| spark|    2|
| flink|    1|
|hadoop|    2|
+------+-----+

探究源码

接触过Spark SQL的小伙伴肯定跟我一样,同样是SparkSession入口,代码也很相似,那加载外部数据源在源码实现中有何不同呢?
我们首先来看Spark Structured Streaming如何加载数据源的,点开spark.readStream源码发现主要是new DataStreamReader(self),那我们主要来探究一下这个类。
阅读源码第一件事就是阅读其注释,源码如下:

/**
 * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
 * key-value stores, etc). Use `SparkSession.readStream` to access this.
 *
 * @since 2.0.0
 */
@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {

我们由注释可知,这个类就是像Dataset一样加载外部数据源,但是它的定义决定只能在org.apache.spark.sql包下使用,外部不能使用。
第二件事就是找到入口,一般来说是main函数,但是这个类没有,我们由前面例子知道最后一步调用是load方法,于是从这个方法开始阅读。

/**
   * Loads input data stream in as a `DataFrame`, for data streams that don't require a path
   * (e.g. external key-value stores).
   *
   * @since 2.0.0
   */
  def load(): DataFrame = {
    //source是类属性,表示外部数据源格式,由format方法外部指定,默认parquet
    //不能将source直接指定为hive,否则会报错
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()

lookupDataSource方法是个比较重要的方法,主要是加载内置与外部自定义数据源,他会尝试得到上下文加载器,如果不行则是Spark加载器,之后先在实现DataSourceRegister接口类中寻找外部指定的source,若不在则是直接将其加载,主要逻辑如下:

def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    //尝试得到ContextClassLoader,若不行则是SparkClassLoader
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
      //实现DataSourceRegister接口类中寻找外部指定的source
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
        // the provider format did not match any given registered aliases
        case Nil =>
          try {
            // 找不到尝试直接加载
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
              case Success(dataSource) =>
                // Found the data source using fully qualified path
                dataSource
              case Failure(error) =>
                ....
            }
          } catch {
            .....
          }
        case head :: Nil =>
          // there is exactly one registered alias
          head.getClass
        case sources =>
         ....
      }
    } catch {
      ....
    }
  }

回到load函数实现中,继续往下阅读发现源码中考虑到V2要兼容V1的不连续的微批处理,所以将V1的Relation作为参数传入进来,在查询不持续的情况下能够切换到V1。

// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
    // We can't be sure at this point whether we'll actually want to use V2, since we don't know the
    // writer or whether the query is continuous.
    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = extraOptions.toMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      case s: MicroBatchReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        var tempReader: MicroBatchReader = null
        val schema = try {
          tempReader = s.createMicroBatchReader(
            Optional.ofNullable(userSpecifiedSchema.orNull),
            //用于故障恢复
            Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
            dataSourceOptions)
          tempReader.readSchema()
        } finally {
          // Stop tempReader to avoid side-effect thing
          if (tempReader != null) {
            tempReader.stop()
            tempReader = null
          }
        }
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            schema.toAttributes, v1Relation)(sparkSession))
      case s: ContinuousReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        val tempReader = s.createContinuousReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          //用于故障恢复
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          dataSourceOptions)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
      case _ =>
        // Code path for data source v1.
        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
    }
  }

看完上面的代码,我向小伙伴提出下面这几个疑问:

  • DataSourceV1和V2有什么样的区别?
  • MicroBatchReadSupport和ContinuousReadSupport分表代表了什么,它们区别在哪里?
  • StreamingRelation和StreamingRelationV2分别是干什么的?

欢迎大家在评论区一起讨论,如果上述哪里写的有问题或者您有更好的建议,也请您一起在评论区留言,下一篇开头我会向大家解释上述问题。