数据倾斜(shuffle优化)
1. etl预计算
2. 提前过滤不需要的大key
3. 设置shuffle read时的task数量 (spark.sql.shuffle.partitions,spark.sql.shuffle.targetPostShuffleInputSize)
4. sortMergeJoin转boardcastJoin
5. group by 操作 分局部聚合和整体聚合(通过对key先加随机数完成)
6. join 操作 对 大 key分情况处理(对大key端的rdd 加随机数,小key端的膨胀N倍)
7. join 操作直接加随机数和膨胀N倍
HashShuffleManager:shuffle write 每个task每个partition写一个文件
SortShuffleManager:shuffle write 每个task每个写一个数据文件和一个索引文件,且数据有序,边排序边写入
BypssSortShuffleManager:先根据HashShuffleManager写多个文件,然后再聚合成一个文件,不拍戏
小文件合并
1. 输入端小文件合并
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize。小于则合并
spark.hadoop.mapreduce.input.fileinputformat.split.minsize。大于则拆分
spark.hadoopRDD.targetBytesInPartition。spark map端合并小文件
spark.hadoop.hive.exec.orc.split.strategy = etl
2. shuffle完成合并小文件
spark.sql.shuffle.partitions
spark.sql.shuffle.targetPostShuffleInputSize
只负责计算shuffle task数量(min(spark.sql.shuffle.partitions,shuffleRead/spark.sql.adaptive.shuffle.targetPostShuffleInputSize)),不进行真正的小文件合并
3. 作业后单独启动map task合并小文件
spark.sql.mergeSmallFileSize 合并小文件阈值
max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition ) 合并小文件实际map输入
4. 不同分区的数据分布
1. 根据分区进行distribute,然后对于不同大小的分区设置不同的task数目
distribute by partition_columns,case when partition_column when floor(rand()*parition_weight)
2. 依赖ORC特性,将相同用户数据放一起,
distribute by partition_columns,case when partition_column then hash(coalesce(device_id,session_id))%parition_weight
其他效率问题
1. 数据过大 -> 拆分子任务并行执行
2. left join 过滤条件不起作用 -> 谓词下推使用自查询
3. a.避免创建重复的rdd b.尽可能复用同一个rdd c.对多次使用的rdd进行cache
4. mapParition/foreachPartition 替代 map,foreach,使用repartitionAndSortWithinPartitions替代repartition与sort类操作(需要注意一个partition过大OOM的问题)
5. 使用reduceByKey/aggregateByKey替代groupByKey
6. filter后使用coalesce压缩数据,重新分区
7. 使用高性能序列化框架 kryo
参数调优
1. 内存能力
spark.driver.memory
spark.executor.memory
spark.executor.memoryOverHead
spark.memory.fraction. (cache+shuffle/memory+memoryOverHead)
spark.memory.storageFraction (cache/cache+shuffle)
jvm:
-Xmn (年轻代)
-XX:NewRatio (老年代/年轻代)
-XX:+UseG1GC (启用G1垃圾回收器)
2. 并行度
spark.executor.cores
spark.default.parallelism (spark默认task数量)
spark.dynamicAllocation.enabled
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
3. shuffle
spark.sql.adaptive.enabled
spark.sql.shuffle.partitions
spark.sql.adaptive.shuffle.targetPostShuffleInputSize
spark.sql.adaptive.join.enabled (自动转换广播)
spark.sql.adaptiveBroadcastJoinThreshold (自动转换广播阈值)
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
4. 小文件
spark.hadoop.targetBytesInParition
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
SparkContext初始化
1. 启动AM(进程)
2. AM启动DRIVER(线程)
3. DRIVER 运行客户端程序,创建SC
4. SC创建TaskScheduler(Yarn Cluster)
5. SchedulerBackend(Yarn Cluster)
启动DriverEndPoint用来和Executor通信
启动YarnSchedulerEndPoint用来和AM通信
6. 启动DagScheduler
6. TaskScheduler将sc注册给AM
7. AM启动AMEndpoint并发送给YarnSchedulerEndPoint(这样YarnSchedulerEndPoint就能和AM通信了)
8. AM启动YarnAllocator 并 创建定时任务触发资源申请
动态申请资源
1. Spark程序运行过程中DAGScheduler 维护PendingTask,SchedulerBackEnd维护RunningTasks
2. ExecutorAllocationManager 定时收集pendingTask和RunningTask,根据Core来计算需要的Executor
3. ExecutorAllocationManager 调用SchedulerBackEnd 将需要的executor数发送出去
4. SchedulerBackEnd 调用子类 YarnSchedulerBackEnd 的 YarnSchedulerEndPoint 将需要的executor 发送给AM
5. AM更新YarnAllocator的需要的Executor数,等待AM的定时任务触发资源申请