概述
Structured Streaming是一个基于Spark SQL的分布式流处理引擎,它有着以下几个特点:
- 直接可以使用DF/DS的API来写流处理逻辑,比如聚合(streaming aggregations)、事件窗口(event-time windows)、流到批处理连接(stream-to-batch joins)等
- 以checkpoint和WAL(预写日志)方式保证端对端精确消费一次语义
- 端对端低延迟(100毫秒)处理
- 高扩展性、高容错
核心概念
Structured Streaming将持续流动的数据看作一张无限进行插入的表,当有查询操作时,它会根据具体代码生成"结果表",在间隔时间内每有一条数据进来,"结果表"都会被更新。当然Structured Streaming不会真正的将整张表实现,它其实只是读取最新的数据进行处理更新结果之后抛弃原数据,只维护最小的中间状态数据用来更新结果,关于这个中间状态后来文章会详细介绍的。
下面是官网提供的概念图,便于大家理解:
代码演示
上述描述这么多,不如直接上代码,这次的实例是经典的WordCount,使用语言是Scala,版本为2.11.8。如果有小伙伴之前没有接触过Scala的,可以看看我的Scala系列。
首先是修改pom.xml,增添下述依赖:
<properties>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.4.4</version>
</dependency>
接着创建SparkSession,以此为入口,侧面也说明Structured Streaming基于Spark SQL引擎,你可以用DS/DF来做流处理。
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[2]")
.getOrCreate()
最后就是我们的熟悉的三部曲,source->transform->sink,我们从socket中读取数据进行wc操作之后输出到console。
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
由于开发环境是mac,直接nc -lk 9999,输入一系列测试数据,控制台就会输出结果,结果如下。如果不是mac的小伙伴,云主机也是可以的哦,只需要修改一下host就行啦!
$ nc -lk 9999
hadoop hadoop flink
spark spark
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| flink| 1|
|hadoop| 2|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| spark| 2|
| flink| 1|
|hadoop| 2|
+------+-----+
探究源码
接触过Spark SQL的小伙伴肯定跟我一样,同样是SparkSession入口,代码也很相似,那加载外部数据源在源码实现中有何不同呢?
我们首先来看Spark Structured Streaming如何加载数据源的,点开spark.readStream源码发现主要是new DataStreamReader(self),那我们主要来探究一下这个类。
阅读源码第一件事就是阅读其注释,源码如下:
/**
* Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
* key-value stores, etc). Use `SparkSession.readStream` to access this.
*
* @since 2.0.0
*/
@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
我们由注释可知,这个类就是像Dataset一样加载外部数据源,但是它的定义决定只能在org.apache.spark.sql包下使用,外部不能使用。
第二件事就是找到入口,一般来说是main函数,但是这个类没有,我们由前面例子知道最后一步调用是load方法,于是从这个方法开始阅读。
/**
* Loads input data stream in as a `DataFrame`, for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
def load(): DataFrame = {
//source是类属性,表示外部数据源格式,由format方法外部指定,默认parquet
//不能将source直接指定为hive,否则会报错
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
lookupDataSource方法是个比较重要的方法,主要是加载内置与外部自定义数据源,他会尝试得到上下文加载器,如果不行则是Spark加载器,之后先在实现DataSourceRegister接口类中寻找外部指定的source,若不在则是直接将其加载,主要逻辑如下:
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
//尝试得到ContextClassLoader,若不行则是SparkClassLoader
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try {
//实现DataSourceRegister接口类中寻找外部指定的source
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
// the provider format did not match any given registered aliases
case Nil =>
try {
// 找不到尝试直接加载
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
....
}
} catch {
.....
}
case head :: Nil =>
// there is exactly one registered alias
head.getClass
case sources =>
....
}
} catch {
....
}
}
回到load函数实现中,继续往下阅读发现源码中考虑到V2要兼容V1的不连续的微批处理,所以将V1的Relation作为参数传入进来,在查询不持续的情况下能够切换到V1。
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
// We can't be sure at this point whether we'll actually want to use V2, since we don't know the
// writer or whether the query is continuous.
val v1DataSource = DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
val v1Relation = ds match {
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
case s: MicroBatchReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dataSourceOptions = new DataSourceOptions(options.asJava)
var tempReader: MicroBatchReader = null
val schema = try {
tempReader = s.createMicroBatchReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
//用于故障恢复
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
dataSourceOptions)
tempReader.readSchema()
} finally {
// Stop tempReader to avoid side-effect thing
if (tempReader != null) {
tempReader.stop()
tempReader = null
}
}
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
s, source, options,
schema.toAttributes, v1Relation)(sparkSession))
case s: ContinuousReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dataSourceOptions = new DataSourceOptions(options.asJava)
val tempReader = s.createContinuousReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
//用于故障恢复
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
dataSourceOptions)
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
s, source, options,
tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
case _ =>
// Code path for data source v1.
Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
}
}
看完上面的代码,我向小伙伴提出下面这几个疑问:
- DataSourceV1和V2有什么样的区别?
- MicroBatchReadSupport和ContinuousReadSupport分表代表了什么,它们区别在哪里?
- StreamingRelation和StreamingRelationV2分别是干什么的?
欢迎大家在评论区一起讨论,如果上述哪里写的有问题或者您有更好的建议,也请您一起在评论区留言,下一篇开头我会向大家解释上述问题。