Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战

1,316 阅读5分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 从ShuffeManager讲起

一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:

  • 目前只有一个实现 SortShuffleManager。
  • SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
  • SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
  • SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。

官方英文介绍如下:

     * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.

1 华山论剑之BypassMergeSortShuffleWriter

从命名来看,绝对是投机取巧,绕开合并和排序的ShuffleWriter,姑且称之为投机侠吧。

 * This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
 * writes incoming records to separate files, one file per reduce partition, then concatenates these
 * per-partition files to form a single output file, regions of which are served to reducers.
 * Records are not buffered in memory. It writes output in a format

2 华山论剑之成员力量

BypassMergeSortShuffleWriter可是直接开挂的节奏,完全没有什么排序器啊,我来承担一切。我最屌,我承担一切,心声,嘿嘿。

2.1 BypassMergeSortShuffleWriter的孩子:

  • partitionWriters : 看看初始化为数组 ==> private DiskBlockObjectWriter[] partitionWriters,每一个DiskBlockObjectWriter负责处理一个分区的数据。

  • private final int fileBufferSize ==>文件缓冲大小,通过Spark.shuffle.file.buffer属性配置,默认是32KB。

  • private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。

  • private final int numPartitions => 分区数

  • private final BlockManager blockManager

  • private final Partitioner partitioner => 分区计算器

  • private final ShuffleWriteMetrics writeMetrics

  • private final int shuffleId;

  • private final int mapId ==>map任务的身份标识。

  • private final Serializer serializer;

  • private final IndexShuffleBlockResolver shuffleBlockResolver

  • private FileSegment[] partitionWriterSegments ==>FileSegment数组,每一个DiskBlockObjectWriter对应一个分区,也因此对应一个处理的文件片。

  • @Nullable private MapStatus mapStatus;

  • private long[] partitionLengths;

2 BypassMergeSortShuffleWriter核心实现方法Writer

先欣赏代码段:

    public void write(Iterator<Product2<K, V>> records) throws IOException {
        assert (partitionWriters == null);
        if (!records.hasNext()) {
          partitionLengths = new long[numPartitions];
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
          mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
          return;
        }
        final SerializerInstance serInstance = serializer.newInstance();
        final long openStartTime = System.nanoTime();
        
        partitionWriters = new DiskBlockObjectWriter[numPartitions];    <=点睛之笔
        partitionWriterSegments = new FileSegment[numPartitions];       <=点睛之笔
        
        for (int i = 0; i < numPartitions; i++) {              <=点睛之笔(按照分区来写片段)
          final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
            blockManager.diskBlockManager().createTempShuffleBlock();
          final File file = tempShuffleBlockIdPlusFile._2();
          final BlockId blockId = tempShuffleBlockIdPlusFile._1();
          
          partitionWriters[i] =                     <=点睛之笔(得到不同分区的DiskBlockObjectWriter)
            blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
        }                                           
        
        
        // Creating the file to write to and creating a disk writer both involve interacting with
        // the disk, and can take a long time in aggregate when we open many files, so should be
        // included in the shuffle write time.
        writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
    
        while (records.hasNext()) {
          final Product2<K, V> record = records.next();
          final K key = record._1();
          partitionWriters[partitioner.getPartition(key)].write(key, record._2());
        }
    
        for (int i = 0; i < numPartitions; i++) {
          final DiskBlockObjectWriter writer = partitionWriters[i];
          partitionWriterSegments[i] = writer.commitAndGet();  <= 生成一堆临时文件,写入到磁盘
          writer.close();
        }
    
        File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);  <==获取一堆临时文件
        File tmp = Utils.tempFileWith(output);
        try {
        
        
          partitionLengths = writePartitionedFile(tmp);   <==多个分区文件合并
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
                                                          <==生成索引
                                                          
                                                          
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
          }
        }
        mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
      }

2 BypassMergeSortShuffleWriter核心实现方法writePartitionedFile

聚合每一个分区文件为正式的Block文件

    Concatenate all of the per-partition files into a single combined file.

    private long[] writePartitionedFile(File outputFile) throws IOException {
        // Track location of the partition starts in the output file
        final long[] lengths = new long[numPartitions];
        if (partitionWriters == null) {
          // We were passed an empty iterator
          return lengths;
        }
    
        final FileOutputStream out = new FileOutputStream(outputFile, true);
        final long writeStartTime = System.nanoTime();
        boolean threwException = true;
        try {
          for (int i = 0; i < numPartitions; i++) {
            final File file = partitionWriterSegments[i].file();
            if (file.exists()) {
              final FileInputStream in = new FileInputStream(file);
              boolean copyThrewException = true;
              try {
                lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
                copyThrewException = false;
              } finally {
                Closeables.close(in, copyThrewException);
              }
              if (!file.delete()) {
                logger.error("Unable to delete file for partition {}", i);
              }
            }
          }
          threwException = false;
        } finally {
          Closeables.close(out, threwException);
          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
        }
        partitionWriters = null;
        return lengths;
      }

3 BypassMergeSortShuffleWriter核心shuffle write流程

  • 根据分区ID,为每一个分区创建DiskBlockObjectWriter
  • 按照分区ID升序写入正式的Shuffle数据文件
  • 最终通过writeIndexFileAndCommit建立MapTask输出的数据索引

不废话,这张图简直画的太好了,望原图作者看到留言于我。

4 总结

本节内容是作者投入大量时间优化后的内容,采用最平实的语言来剖析 ShuffeManager之统一存储服务BypassMergeSortShuffleWriter设计思路。

秦凯新 于深圳 0:53分