Spark_数据倾斜简单总结

777 阅读8分钟

什么是数据倾斜


数据倾斜是一种很常见的问题(依据二八定律),简单来说,比方 WordCount 中某个 Key 对应的数据量非常大的话,就会产生数据倾斜,导致两个后果:

  • OOM(单或少数的节点);
  • 拖慢整个 Job 执行时间(其他已经完成的节点都在等这个还在做的节点)。

解决数据倾斜需要


  • 搞定 Shuffle;
  • 搞定业务场景;
  • 搞定 CPU core 的使用情况;
  • 搞定 OOM 的根本原因等:一般都因为数据倾斜(某 Task 任务的数据量过大,GC 压力大,和 Kafka 不同在于 Kafka 的内存不经过 JVM,其基于 Linux 的 Page)。

导致 Spark 数据倾斜的本质


Shuffle 时,需将各节点的相同 key 的数据拉取到某节点上的一个 task 来处理,若某个 key 对应的数据量很大就会发生数据倾斜。比方说大部分 key 对应 10 条数据,某 key 对应 10 万条,大部分 task 只会被分配 10 条数据,很快做完,个别 task 分配 10 万条数据,不仅运行时间长,且整个 stage 的作业时间由最慢的 Task 决定。

数据倾斜只会发生在 Shuffle 过程,以下算法可能触发 Shuffle 操作: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。

定位最慢的 Task 所处的源码位置


步骤一: 看数据倾斜发生在哪个 stage(也就是看以上算子出现在哪个阶段)。yarn-client 模式下查看本地 log 或 Spark Web UI 中当前运行的是哪个 stage;yarn-cluster 模式下,通过 Spark Web UI 查看运行到了哪个 Stage。 主要看最慢的 Stage 各 task 分配的数据量,来确定是否是数据倾斜。

步骤二:根据 Stage 划分,推算倾斜发生的代码(必然有 Shuffle 类算子)。简单实用方法:只要看到 shuffle 类算子或 Spark SQL 的 SQL 语句会有 Shuffle 类的算子的句子,就可以该地方划分为前后两个 Stage。(之前用 Python 的 PySpark 接口,Spark Web UI 会查看 task 在源码中的行数,Java 或者 Scala 虽没用过,但我想应该有)

解决方案


方案一:使用 Hive ETL 预处理

场景:若 Hive 表中数据不均匀,且业务中会频繁用 Spark 对 Hive 表分析; 思路:用 Hive 对数据预处理(对 key 聚合等操作),原本是 Spark 对 Hive 的原表操作,现在就是对 Hive 预处理后的表操作; 原理:从根源解决了数据倾斜,规避了了 Spark 进行 Shuffle 类算子操作。但 Hive ETL 中进行聚合等操作会发生数据倾斜,只是把慢转移给了 Hive ETL; 优点:方便,效果好,规避了 Spark 数据倾斜; 缺点:治标不治本,Hive ETL 会数据倾斜。

方案二:过滤导致倾斜的 key

场景:发生倾斜的 key 很少且不重要; 思路:对发生倾斜的 key 过滤掉。比方在 Spark SQL 中用 where 子句或 filter 过滤,若每次作业执行,需要动态判定可使用 sample 算子对 RDD 采样后取数据量最多的 key 过滤; 原理:对倾斜的 key 过滤后,这些 key 便不会参与后面的计算,从本质上消除数据倾斜; 优点:简单,效果明显; 缺点:适用场景少,实际中导致倾斜的 key 很多。

方案三:提高 Shuffle 操作并行度

场景:任何场景都可以,优先选择的最简单方案; 思路:对 RDD 操作的 Shuffle 算子传入一个参数,也就是设置 Shuffle 算子执行时的 Shuffle read task 数量。对于 Spark SQL 的 Shuffle 类语句(如 group by,join)即 spark.sql.shuffle.partitions,代表 shuffle read task 的并行度,默认值是 200 可修改; 原理:增大 shuffle read task 参数值,让每个 task 处理比原来更少的数据;
优点:简单,有效; 缺点:缓解的效果很有限。

方案四:双重聚合(局部聚合 + 全局聚合)

场景:对 RDD 进行 reduceByKey 等聚合类 shuffle 算子,SparkSQL 的 groupBy 做分组聚合这两种情况
思路:首先通过 map 给每个 key 打上 n 以内的随机数的前缀并进行局部聚合,即 (hello, 1) (hello, 1) (hello, 1) (hello, 1) 变为(1_hello, 1) (1_hello, 1) (2_hello, 1),并进行 reduceByKey 的局部聚合,然后再次 map 将 key 的前缀随机数去掉再次进行全局聚合; 原理:对原本相同的 key 进行随机数附加,变成不同 key,让原本一个 task 处理的数据分摊到多个 task 做局部聚合,规避单 task 数据过量。之后再去随机前缀进行全局聚合; 优点:效果非常好(对聚合类 Shuffle 操作的倾斜问题); 缺点:范围窄(仅适用于聚合类的 Shuffle 操作,join 类的 Shuffle 还需其它方案)。

方案五:将 reduce join 转为 map join

场景:对 RDD 或 Spark SQL 使用 join 类操作或语句,且 join 操作的 RDD 或表比较小(百兆或 1,2G); 思路:使用 broadcast 和 map 类算子实现 join 的功能替代原本的 join,彻底规避 shuffle。对较小 RDD 直接 collect 到内存,并创建 broadcast 变量;并对另外一个 RDD 执行 map 类算子,在该算子的函数中,从 broadcast 变量(collect 出的较小 RDD)与当前 RDD 中的每条数据依次比对 key,相同的 key 执行你需要方式的 join; 原理:若 RDD 较小,可采用广播小的 RDD,并对大的 RDD 进行 map,来实现与 join 同样的效果。简而言之,用 broadcast-map 代替 join,规避 join 带来的 shuffle(无 Shuffle 无倾斜); 优点:效果很好(对 join 操作导致的倾斜),根治; 缺点:适用场景小(大表 + 小表),广播(driver 和 executor 节点都会驻留小表数据)小表也耗内存。

方案六:采样倾斜 key 并分拆 join 操作

场景:两个较大的(无法采用方案五)RDD/Hive 表进行 join 时,且一个 RDD/Hive 表中少数 key 数据量过大,另一个 RDD/Hive 表的 key 分布较均匀(RDD 中两者之一有一个更倾斜); 思路:

  1. 对更倾斜 RDD1 进行采样(RDD.sample)并统计出数据量最大的几个 key;
  2. 对这几个倾斜的 key 从原本 RDD1 中拆出形成一个单独的 RDD1_1,并打上 0~n 的随机数前缀,被拆分的原 RDD1 的另一部分(不包含倾斜 key)又形成一个新 RDD1_2;
  3. 对 RDD2 过滤出 RDD1 倾斜的 key,得到 RDD2_1,并将其中每条数据扩 n 倍,对每条数据按顺序附加 0~n 的前缀,被拆分出 key 的 RDD2 也独立形成另一个 RDD2_2; 【个人认为,这里扩了 n 倍,最后 union 完还需要将每个倾斜 key 对应的 value 减去 (n-1)】
  4. 将加了随机前缀的 RDD1_1 和 RDD2_1 进行 join(此时原本倾斜的 key 被打散 n 份并被分散到更多的 task 中进行 join); 【个人认为,这里应该做两次 join,两次 join 中间有一个 map 去前缀】
  5. 另外两个普通的 RDD(RDD1_2、RDD2_2)照常 join;
  6. 最后将两次 join 的结果用 union 结合得到最终的 join 结果。 原理:对 join 导致的倾斜是因为某几个 key,可将原本 RDD 中的倾斜 key 拆分出原 RDD 得到新 RDD,并以加随机前缀的方式打散 n 份做 join,将倾斜 key 对应的大量数据分摊到更多 task 上来规避倾斜;

优点:前提是 join 导致的倾斜(某几个 key 倾斜),避免占用过多内存(只需对少数倾斜 key 扩容 n 倍);
缺点:对过多倾斜 key 不适用。

方案七:用随机前缀和扩容 RDD 进行 join

场景:RDD 中有大量 key 导致倾斜; 思路:与方案六类似。

  1. 查看 RDD/Hive 表中数据分布并找到造成倾斜的 RDD / 表;
  2. 对倾斜 RDD 中的每条数据打上 n 以内的随机数前缀;
  3. 对另外一个正常 RDD 的每条数据扩容 n 倍,扩容出的每条数据依次打上 0 到 n 的前缀;
  4. 对处理后的两个 RDD 进行 join。

原理:与方案六只有唯一不同在于这里对不倾斜 RDD 中所有数据进行扩大 n 倍,而不是找出倾斜 key 进行扩容(这是方案六); 优点:对 join 类的数据倾斜都可处理,效果非常显著; 缺点:缓解,扩容需要大内存。 【个人认为,这里和方案六一样,也需要对扩容的 key 对应的 value 最后减去 (n-1),除非只需大小关系,对值没有要求】

方案八:多种方案组合

实际中,需综合着对业务全盘考虑,可先用方案一和二进行预处理,同时在需要 Shuffle 的操作提升 Shuffle 的并行度,最后针对数据分布选择后面方案中的一种或多种。实际中需要对数据和方案思路理解灵活应用。