Spark调优-初级

4,565 阅读9分钟

写在前面

先列出所有的调优参数,方便快速查询

set hive.exec.dynamic.partition=true; ##--动态分区
set hive.exec.dynamic.partition.mode=nonstrict; ##--动态分区
set hive.auto.convert.join=true; ##-- 自动判断大表和小表

##-- hive并行
set hive.exec.parallel=true;
set hive.merge.mapredfiles=true;

##-- 内存能力
set spark.driver.memory=8G; 
set spark.executor.memory=2G; 

##-- 并发度
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.maxExecutors=50;
set spark.executor.cores=2;

##-- shuffle
set spark.sql.shuffle.partitions=100; -- 默认的partition数,及shuffle的reader数
set spark.sql.adaptive.enabled=true; -- 启用自动设置 Shuffle Reducer 的特性,动态设置Shuffle Reducer个数(Adaptive Execution 的自动设置 Reducer 是由 ExchangeCoordinator 根据 Shuffle Write 统计信息决定)
set spark.sql.adaptive.join.enabled=true; -- 开启 Adaptive Execution 的动态调整 Join 功能 (根据前面stage的shuffle write信息操作来动态调整是使用sortMergeJoin还是broadcastJoin)
set spark.sql.adaptiveBroadcastJoinThreshold=268435456; -- 64M ,设置 SortMergeJoin 转 BroadcastJoin 的阈值,默认与spark.sql.autoBroadcastJoinThreshold相同
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728; -- shuffle时每个reducer读取的数据量大小,Adaptive Execution就是根据这个值动态设置Shuffle reader的数量
set spark.sql.adaptive.allowAdditionalShuffle=true; -- 是否允许为了优化 Join 而增加 Shuffle,默认为false
set spark.shuffle.service.enabled=true; 


##-- orc
set spark.sql.orc.filterPushdown=true;
set spark.sql.orc.splits.include.file.footer=true;
set spark.sql.orc.cache.stripe.details.size=10000;
set hive.exec.orc.split.strategy=ETL -- ETL:会切分文件,多个stripe组成一个split,BI:按文件进行切分,HYBRID:平均文件大小大于hadoop最大split值使用ETL,否则BI
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728; -- 128M 读ORC时,设置一个split的最大值,超出后会进行文件切分
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864; -- 64M 读ORC时,设置小文件合并的阈值

##-- 其他
set spark.sql.hive.metastorePartitionPruning=true;

##-- 广播表
set spark.sql.autoBroadcastJoinThreshold=268435456; -- 256M

##-- 小文件
set spark.sql.mergeSmallFileSize=10485760; -- 10M -- 小文件合并的阈值
set spark.hadoopRDD.targetBytesInPartition=67108864; -- 64M 设置stage 输入端的map(不涉及shuffle的时候)合并文件大小
set spark.sql.targetBytesInPartitionWhenMerge=67108864; --64M 设置额外的(非最开始)合并job的map端输入size

一 运行行为

1.1 动态生成分区

下列Hive参数对Spark同样起作用。

set hive.exec.dynamic.partition=true; // 是否允许动态生成分区

set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成

set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数

1.2 broadcast join

当大表JOIN小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行JOIN,最后汇总,能够大大提升作业性能。

spark.sql.autoBroadcastJoinThreshold 默认值为26214400(25M),如果小表的大小小于该值,则会将小表广播到所有executor中,使JOIN快速完成。如果该值设置太大,则会导致executor内存压力过大,容易出现OOM。

注:ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。请妥善配置该参数,并配合spark.executor.memory,使作业能够顺利执行。

1.3 动态资源分配

spark.dynamicAllocation.enabled:是否开启动态资源分配,默认开启,同时强烈建议用户不要关闭。理由:开启动态资源分配后,Spark可以根据当前作业的负载动态申请和释放资源。

spark.dynamicAllocation.maxExecutors: 开启动态资源分配后,同一时刻,最多可申请的executor个数。平台默认设置为1000。当在Spark UI中观察到task较多时,可适当调大此参数,保证task能够并发执行完成,缩短作业执行时间。

下图是一个由于并发不足导致作业执行较慢的一个明显的任务:

打开执行时间较长的stage,查看其任务数为2w+。

image.png

点击stage的链接,进入查看stage中的任务,将任务按照Launch Time排序,先有小到大再由大到小。

image.png

image.png

可以看到任务启动时间差了3个多小时。可以确定该任务是由于spark.dynamicAllocation.maxExecutors过小导致的。该参数可以和spark.executor.cores配合增大作业并发度。

spark.dynamicAllocation.minExecutors: 和s,d,maxExecutors相反,此参数限定了某一时刻executor的最小个数。平台默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活,保证任务可以迅速调度。

1.4 Shuflle相关

spark.sql.shuffle.partitions: 在有JOIN或聚合等需要shuffle的操作时,从mapper端写出的partition个数,默认设置为2000。

select a, avg(c) from test_table group by a语句,不考虑优化行为,如果一个map端的task中包含有3000个a,根据spark.sql.shuffle.partitions=2000,会将计算结果分成2000份partition(例如按2000取余),写到磁盘,启动2000个reducer,每个reducer从每个mapper端拉取对应索引的partition。

当作业数据较多时,适当调大该值,当作业数据较少时,适当调小以节省资源。

spark.sql.adaptive.enabled:是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。默认开启,同时强烈建议开启。理由:更好利用单个executor的性能,还能缓解小文件问题。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize:和spark.sql.adaptive.enabled配合使用,当开启调整partition功能后,当mapper端两个partition的数据合并后数据量小于targetPostShuffleInputSize时,Spark会将两个partition进行合并到一个reducer端进行处理。平台默认为67108864(64M),用户可根据自身作业的情况酌情调整该值。当调大该值时,一个reduce端task处理的数据量变大,最终产出的数据,存到HDFS上的文件也变大。当调小该值时,相反。

spark.sql.adaptive.minNumPostShufflePartitions: 当spark.sql.adaptive.enabled参数开启后,有时会导致很多分区被合并,为了防止分区过少,可以设置spark.sql.adaptive.minNumPostShufflePartitions参数,防止分区过少而影响性能。

1.5 读ORC表优化

hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。

对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。

对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。

另外,spark.hadoop.mapreduce.input.fileinputformat.split.maxsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小大于spark.hadoop.mapreduce.input.fileinputformat.split.maxsize时,会合并到一个task中处理。可以适当调小该值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大读ORC表的并发。

二 executor能力

2.1内存

spark.executor.memory:executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存。当executor端由于OOM时,多数是由于spark.executor.memory设置较小引起的。该参数一般可以根据表中单个文件的大小进行估计,但是如果是压缩表如ORC,则需要对文件大小乘以2~3倍,这是由于文件解压后所占空间要增长2~3倍。平台默认设置为2G。

spark.yarn.executor.memoryOverhead:Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。

Spark根据 spark.executor.memory+spark.yarn.executor.memoryOverhead的值向RM申请一个容器,当executor运行时使用的内存超过这个限制时,会被yarn kill掉。在Spark UI中相应失败的task的错误信息为:

Container killed by YARN for exceeding memory limits. XXX of YYY physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

这个时候,适当调大spark.yarn.executor.memoryOverhead。默认设置为1024(1G),注意:该参数的单位为MB。但是,如果用户在代码中无限制的使用堆外内存。调大该参数没有意义。需要用户了解自己的代码在executor中的行为,合理使用堆内堆外内存。

spark.sql.windowExec.buffer.spill.threshold:当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘。该参数如果设置的过小,会导致spark频繁写磁盘,如果设置过大则一个窗口中的数据全都留在内存,有OOM的风险。但是,为了实现快速读入磁盘的数据,spark在每次读磁盘数据时,会保存一个1M的缓存。

举例:当spark.sql.windowExec.buffer.spill.threshold为10时,如果一个窗口有100条数据,则spark会写9((100 - 10)/10)次磁盘,在读的时候,会创建9个磁盘reader,每个reader会有一个1M的空间做缓存,也就是说会额外增加9M的空间。

当某个窗口中数据特别多时,会导致写磁盘特别频繁,就会占用很大的内存空间作缓存。因此如果观察到executor的日志中存在大量如下内容,则可以考虑适当调大该参数,平台默认该参数为40960。

pilling data because number of spilledRecords crossed the threshold

2.2 executor并发度

spark.executor.cores:单个executor上可以同时运行的task数。Spark中的task调度在线程上,该参数决定了一个executor上可以并行执行几个task。这几个task共享同一个executor的内存(spark.executor.memory+spark.yarn.executor.memoryOverhead)。适当提高该参数的值,可以有效增加程序的并发度,是作业执行的更快,但使executor端的日志变得不易阅读,同时增加executor内存压力,容易出现OOM。在作业executor端出现OOM时,如果不能增大spark.executor.memory,可以适当降低该值。平台默认设置为1。

该参数是executor的并发度,和spark.dynamicAllocation.maxExecutors配合,可以提高整个作业的并发度。

2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法) executor的JVM参数传递方式为:

set spark.executor.extraJavaOptions="XXXXXXXXXX "。
例如:
set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC"

注:所有的JVM参数必须写在一起,不能分开。 bad case:

    set spark.executor.extraJavaOptions="-XX:NewRatio=3 "; set spark.executor.extraJavaOptions="-XX:+UseG1GC " ;

打开GC打印:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

full GC 频繁:内存不够用,调大spark.executor.memory,调小spark.executor.cores

minor GC频繁,而full GC比较少:可以适当提高Eden区大小-Xmn

如果OldGen区快要满了,适当提高spark.executor.memory(默认2G)或适当降低spark.memory.fraction(默认为0.3)或适当提高-XX:NewRatio(老年代是年轻代的多少倍,一般默认是2)。

如果spark.executor.memory调的很大且GC仍是程序运行的瓶颈,可以尝试启用G1垃圾回收器(-XX:+UseG1GC

修改了GC的参数一定要仔细观察GC的频率和时间。

修改方法:set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC ..."

三 driver指标:

3.1 内存

spark.driver.memory:driver使用内存大小, 平台默认为10G,根据作业的大小可以适当增大或减小此值。

3.2 GC优化

通过**set spark.driver.extraJavaOptions="XXXXXXXXXX "**设置,具体设置内容可参考2.4节,一般情况driver内存较大,可尝试启用G1垃圾回收器。