Spark聚合开窗与自定义累加器的高级应用-Spark商业应用实战

2,783 阅读4分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 Spark开窗函数与聚合函数

1.1 Spark开窗函数与聚合函数的区别

开窗函数与聚合函数一样,都是对行的集合组进行聚合计算。但是两者却有本质区别,待我细细给你道来,绝对让你震撼无穷。

  • 开窗函数用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它是对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。极端点说:可以返回所有行的同时外带开窗聚合的列。但是 基于GROUP BY 进行聚合是不行的,因为select中不允许出现非GROUP BY 字段。

  • 聚合函数则不同:不允许同时返回所有列的同时外带聚合(sum,max 等)多行的列。

2 Spark聚合开窗函数使用技巧

开窗函数的调用格式为: 函数名(列) OVER(选项)

  • 第一大类:[聚合开窗函数 -> 聚合函数(列) ] OVER (选项),这里的选项可以是 PARTITION BY 子句,但不可是 ORDER BY 子句,选项可以为空,表示聚合函数向开窗函数的转换。

2.1 Spark开窗函数使用技巧

  • 聚合开窗函数 OVER 关键字 : 表示把聚合函数当成聚合开窗函数而不是聚合函数。

  • (1)SQL 标准允许将所有聚合函数用做聚合开窗函数。通过over()进行转换

      sparkSession.sql("select name, class, score, count(name) over() name_count from score")
    

  • (2)开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行 的分区来供进行聚合计算。通过over(partition by 列 ) 进行分组开窗,此处与 GROUP BY 子句不同

       sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()
    

可以看到:over只是实现了聚合函数到窗函数的转换。且不用group by。开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,因为GROUP BY不允许同时返回所有列的同时外带聚合(sum,max 等)多行的的列。

3 Spark排序开窗函数使用技巧

第二大类:[排序开窗函数 -> 排序函数(列) ] OVER(选项),这里的选项可以是ORDER BY 子句,也可以是OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

  • 对于排序开窗函数来讲,它支持的开窗函数分别为: ROW_NUMBER(行号)、 RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分组排名)。

      sparkSession.sql("select name, class, score, row_number() over(order by score) rank from
      score").show()
    

    sparkSession.sql("select name, class, score, rank() over(order by score) rank from
    score").show()

    sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank from
    score").show()

    sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank from
    score").show()

4:用户自定义聚合函数(UDAF)

4.1 弱类型 UDAF 函数通过继承 UserDefinedAggregateFunction 来实现用户自定义聚合函数。

4.2 强类型 UDAF 函数通过继承 Aggregator 来实现强类型自定义聚合函数。

4:用户自定义聚合函数(UDF)

4.1 注册自定义函数

字符串拼接:

     spark.udf.register("concat_long_string", 
     (v1: Long, v2: String, split: String) => v1.toString + split + v2)

Json抽取字段值:

  spark.udf.register("get_json_object", (json: String, field: String) => {
  val jsonObject = JSONObject.fromObject(json);
  jsonObject.getString(field)})

udaf全数据拼接:

spark.udf.register("group_concat_distinct", new GroupConcatDistinctUDAF())

5 使用自定义函数

6 累加器高级用法

6.1自定义累加器

6.2如何使用累加器

7 总结

本节内容主要探讨了开船函数和自定义累加器的高阶高级使用案例,可能部分截图来自github公开源码,部分是我的测试案例,如有雷同某位大神私有内容,请直接留言于我,我来重新修正案例。

秦凯新 于深圳