Spark读hive text表之非shuffle方式增大并行度

2,064 阅读2分钟

背景介绍:

  • cdh集群、hadoop2.6.0、spark2.3.0
  • hive表:text格式存储
  • 数据块:128M
  • 处理过程:读取hive表 -> 业务处理(无聚合操作) -> 写入hive、es

问题描述:

正常情况下,一个spark task要处理一个partition即128M的数据,因处理过程较耗时而成为任务瓶颈。

解决过程:

大的方向是进行任务拆分,增大并行度。

  • 方法一:使用spark提供的repartition/coalesce

优点:RDD中定义的算子,可以直接使用
缺点:使用以上算子来增大并行度,一定会进行shuffle操作
结论:测试发现,虽然增大了业务处理的并行度,但shuffle操作的开销比较大,因此整体的耗时没有明显减少。

  • 方法二:基于spark读text格式文件的分片算法,从源头减小数据块以增大并行度

初始化SparkSession时进行如下代码设置:

.config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即为想设置的分片大小:64M
.config("mapreduce.job.maps","1000")  // 确保分片足够大

用以实现spark读取hive时,一个task处理一个64M的数据块。
优点:理论来说,并行度扩大一倍,耗时将减少一半。
结论:测试发下,耗时确实大幅度下降。

源码分析

调用链: HadoopTableReader#createHadoopRdd

HadoopRDD#getPartitions
  FileInputFormat#getSplits
    FileInputFormat#computeSplitSize

核心代码片段

private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
    0 // will splitted based on block by default.
  } else {
    math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
      sparkSession.sparkContext.defaultMinPartitions)
  }

val rdd = new HadoopRDD(
      sparkSession.sparkContext,
      _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
      Some(initializeJobConfFunc),
      inputFormatClass,
      classOf[Writable],
      classOf[Writable],
      _minSplitsPerRDD)

由HadoopTableReader生成HadoopRDD,参数:_minSplitsPerRDD在非local模式下可通过mapreduce.job.maps设置


public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    // 多处省略

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

   long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize); 
    

   return splits.toArray(new FileSplit[splits.size()]);
  }

protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";

最终数据分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))计算得到,根据源码可知:

blockSize:hdfs实际存储的blockSize,128M不可变
goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
numSplits local模式下为0;其他模式可通过:mapreduce.job.maps 配置

minSize:SPLIT_MINSIZE与minSplitSize的最大值
SPLIT_MINSIZE:默认为1,可通过mapreduce.input.fileinputformat.split.minsize 配置
minSplitSize:默认为1

默认情况下, 返回结果为128M。为了让计算结果为减小,比如64M,只需要 minSize为64M,Math.min(goalSize, blockSize)足够小即可,即:

  • 设置 numSplits 足够大比如1000(参数:mapreduce.job.maps),就能保证goalSize足够小,进而保证Math.min(goalSize, blockSize)足够小
  • 设置 SPLIT_MINSIZE 为64M(参数:mapreduce.input.fileinputformat.split.minsize),根据 Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize),即可实现 minSize 为64M

结论总结

结合spark分片机制进行参数设置,既提高任务并行度又避免shuffle的性能损耗。