Spark调优总结

2,597 阅读3分钟

数据倾斜(shuffle优化)

1. etl预计算
2. 提前过滤不需要的大key
3. 设置shuffle read时的task数量 (spark.sql.shuffle.partitions,spark.sql.shuffle.targetPostShuffleInputSize)
4. sortMergeJoin转boardcastJoin
5. group by 操作 分局部聚合和整体聚合(通过对key先加随机数完成)
6. join 操作 对 大 key分情况处理(对大key端的rdd 加随机数,小key端的膨胀N倍)
7. join 操作直接加随机数和膨胀N倍

HashShuffleManager:shuffle write 每个task每个partition写一个文件
SortShuffleManager:shuffle write 每个task每个写一个数据文件和一个索引文件,且数据有序,边排序边写入
BypssSortShuffleManager:先根据HashShuffleManager写多个文件,然后再聚合成一个文件,不拍戏

小文件合并

1. 输入端小文件合并
    spark.hadoop.mapreduce.input.fileinputformat.split.maxsize。小于则合并
    spark.hadoop.mapreduce.input.fileinputformat.split.minsize。大于则拆分
    spark.hadoopRDD.targetBytesInPartition。spark map端合并小文件
    spark.hadoop.hive.exec.orc.split.strategy = etl
2. shuffle完成合并小文件
    spark.sql.shuffle.partitions
    spark.sql.shuffle.targetPostShuffleInputSize  
    只负责计算shuffle task数量(min(spark.sql.shuffle.partitions,shuffleRead/spark.sql.adaptive.shuffle.targetPostShuffleInputSize)),不进行真正的小文件合并  
3. 作业后单独启动map task合并小文件
    spark.sql.mergeSmallFileSize 合并小文件阈值
    max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge  , spark.hadoopRDD.targetBytesInPartition ) 合并小文件实际map输入   
4. 不同分区的数据分布       
    1. 根据分区进行distribute,然后对于不同大小的分区设置不同的task数目 
       distribute by partition_columns,case when partition_column when floor(rand()*parition_weight)
    2. 依赖ORC特性,将相同用户数据放一起,   
       distribute by partition_columns,case when partition_column then hash(coalesce(device_id,session_id))%parition_weight

其他效率问题

1. 数据过大 -> 拆分子任务并行执行
2. left join 过滤条件不起作用 -> 谓词下推使用自查询
3. a.避免创建重复的rdd b.尽可能复用同一个rdd c.对多次使用的rdd进行cache
4. mapParition/foreachPartition 替代 map,foreach,使用repartitionAndSortWithinPartitions替代repartition与sort类操作(需要注意一个partition过大OOM的问题)
5. 使用reduceByKey/aggregateByKey替代groupByKey
6. filter后使用coalesce压缩数据,重新分区
7. 使用高性能序列化框架 kryo

参数调优

1. 内存能力
	spark.driver.memory
	spark.executor.memory
	spark.executor.memoryOverHead
	spark.memory.fraction. (cache+shuffle/memory+memoryOverHead)
	spark.memory.storageFraction (cache/cache+shuffle)
	jvm:
	  -Xmn (年轻代)
	  -XX:NewRatio (老年代/年轻代)
	  -XX:+UseG1GC (启用G1垃圾回收器)
2. 并行度
	spark.executor.cores
	spark.default.parallelism (spark默认task数量)
	spark.dynamicAllocation.enabled
    spark.dynamicAllocation.minExecutors
    spark.dynamicAllocation.maxExecutors	    	
3. shuffle
    spark.sql.adaptive.enabled
    spark.sql.shuffle.partitions
    spark.sql.adaptive.shuffle.targetPostShuffleInputSize
    spark.sql.adaptive.join.enabled (自动转换广播)
    spark.sql.adaptiveBroadcastJoinThreshold (自动转换广播阈值)
    spark.shuffle.file.buffer
    spark.reducer.maxSizeInFlight
4. 小文件
	spark.hadoop.targetBytesInParition
	spark.hadoop.mapreduce.input.fileinputformat.split.maxsize
    spark.hadoop.mapreduce.input.fileinputformat.split.minsize

SparkContext初始化

1. 启动AM(进程)
2. AM启动DRIVER(线程)
3. DRIVER 运行客户端程序,创建SC
4. SC创建TaskScheduler(Yarn Cluster)	
5. SchedulerBackend(Yarn Cluster)	
	启动DriverEndPoint用来和Executor通信
	启动YarnSchedulerEndPoint用来和AM通信
6. 启动DagScheduler	
6. TaskScheduler将sc注册给AM     
7. AM启动AMEndpoint并发送给YarnSchedulerEndPoint(这样YarnSchedulerEndPoint就能和AM通信了)
8. AM启动YarnAllocator 并 创建定时任务触发资源申请

动态申请资源

1. Spark程序运行过程中DAGScheduler	维护PendingTask,SchedulerBackEnd维护RunningTasks
2. ExecutorAllocationManager 定时收集pendingTask和RunningTask,根据Core来计算需要的Executor
3. ExecutorAllocationManager 调用SchedulerBackEnd 将需要的executor数发送出去
4. SchedulerBackEnd 调用子类 YarnSchedulerBackEnd 的 YarnSchedulerEndPoint 将需要的executor 发送给AM
5. AM更新YarnAllocator的需要的Executor数,等待AM的定时任务触发资源申请