Structured Streaming通过schema_of_json方法动态解析Kafka的JSON数据的Schema

3,208 阅读3分钟

Structured Streaming中如何解析Kafka传入的JSON数据的Schema

在实际生产中消息中的字段可能会发生变化,比如多加一个字段什么的,但是Spark程序又不能停下来,所以考虑在程序中不是自定义好Schema,而是通过Kafka输入消息中json串来infer Schema。当然,也可以通过广播变量来更新配置文件,定期更新Schema,这也是一种写法

在之前Spark Streaming中解析kafka的json格式数据时,采用的也是Schema infer来推断,如下

dStream.map(_.value).foreachRDD(rdd=>{
  ...
  val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
  val df = spark.read.json(spark.createDataSet(rdd))
  ...
})

这样通过解析json字符串,可以直接把json串的key作为DataFrame的Columns列名

但是Structured Streaming中是直接生成DataFrame的,这样做就不行。翻了下api发现了一个从json字符串推断Schema的方法——schema_of_json

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: String): Column = schema_of_json(lit(json))

/**
 * Parses a JSON string and infers its schema in DDL format.
 *
 * @param json a string literal containing a JSON string.
 *
 * @group collection_funcs
 * @since 2.4.0
 */
def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr))

绝对是第一手资料,我Google了一下这个方法的使用,除了Stack Overflow上有两个帖子讨论,但这个帖子给出的方法还是会报错

How to query JSON data column using Spark DataFrames?

Implicit schema discovery on a JSON-formatted Spark DataFrame column?

这里新建了个df测试下

图片

尝试使用第二个方法df.select(schema_of_json($"col"))时报错

scala> df.select(schema_of_json($"col"))
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`col`)' due to data type mismatch: The input json should be a string literal and not null; however, got `col`.;;

看报错信息是需要给一个字符串参数,所以做如下尝试

图片
也就是取出第一行Row的值,用来作推断,发现是可以的

最后修改如下

scala> df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1) as "schema").show(false)

+--------------------------------------------------------------------+
|schema                                                              |
+--------------------------------------------------------------------+
|cardno:string,cardtype:string,flag:string,times:string,userid:string|
|cardno:string,cardtype:string,flag:string,times:string,userid:string|
+--------------------------------------------------------------------+

最终写法来创建一个Schema

scala> val str = df.select(schema_of_json(df.take(1)(0).get(0).toString).alias("schema")).select(regexp_extract($"schema","struct<(.*?)>",1)).take(1)(0).getAs[String](0)
str: String = cardno:string,cardtype:string,flag:string,times:string,userid:string

scala> val columns = str.split(",").map(x=>x.split(":")).map(x=>x(0))
columns: Array[String] = Array(cardno, cardtype, flag, times, userid)

scala> var schema = (new StructType)
schema: org.apache.spark.sql.types.StructType = StructType()

scala> columns.map(x=>{schema = schema.add(x,StringType,true)})
res154: Array[Unit] = Array((), (), (), (), ())

scala> schema
res159: org.apache.spark.sql.types.StructType = StructType(StructField(cardno,StringType,true), StructField(cardtype,StringType,true), StructField(flag,StringType,true), 
StructField(times,StringType,true), StructField(userid,StringType,true))

scala> schema.simpleString
res160: String = struct<cardno:string,cardtype:string,flag:string,times:string,userid:string>

就可以用推断出的Schema来解析json了

df.select(from_json($"col",schema) as "json_value").select("json_value.*")

但这样写还是适合简单的Schema结构,要是存在嵌套的结构,像Stack Overflow上有个问题,如何定义如下的Schema

struct<abc:struct<name:string>,pqr:struct<address:string>> 

参考:How to create schema (StructType) with one or more StructTypes?

import org.apache.spark.sql.types._
val name = new StructType().add($"name".string)
scala> println(name.simpleString)
struct<name:string>

val address = new StructType().add($"address".string)
scala> println(address.simpleString)
struct<address:string>

val schema = new StructType().add("abc", name).add("pqr", address)
scala> println(schema.simpleString)
struct<abc:struct<name:string>,pqr:struct<address:string>>

scala> schema.simpleString == "struct<abc:struct<name:string>,pqr:struct<address:string>>"
res4: Boolean = true

scala> schema.printTreeString
root
 |-- abc: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |-- pqr: struct (nullable = true)
 |    |-- address: string (nullable = true)

这样的话要写个专门来解析的方法


其次厚脸皮推荐一个总结归纳的项目吧,面向Linux学习、大数据、机器学习、数据分析、算法等,欢迎前往推荐更多资源: Coding-Now

学习记录的一些笔记,以及所看得一些电子书eBooks、视频资源和平常收纳的一些自己认为比较好的博客、网站、工具 github.com/josonle/Big…

在这里插入图片描述
在这里插入图片描述