阅读 105

Spark ShuffleManager内存缓冲器UnsafeShuffleWriter设计思路剖析

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 从ShuffeManager讲起

一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:

  • 目前只有一个实现 SortShuffleManager。
  • SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
  • SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
  • SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。

官方英文介绍如下:

     * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.
复制代码

2 UnsafeShuffleWriter的骨干成员

static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;

  • private final BlockManager blockManager;

  • private final IndexShuffleBlockResolver shuffleBlockResolver;

  • private final TaskMemoryManager memoryManager;

  • private final SerializerInstance serializer;

  • private final Partitioner partitioner;

  • private final ShuffleWriteMetrics writeMetrics;

  • private final int shuffleId;

  • private final int mapId;

  • private final TaskContext taskContext;

  • private final SparkConf sparkConf;

  • private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。

  • private final int initialSortBufferSize =>初始化的排序缓冲大小,可以通过spark.shuffle.sort.initialBuffer.size属性设置,默认是4096

  • private final int inputBufferSizeInBytes;

  • private final int outputBufferSizeInBytes;

  • @Nullable private MapStatus mapStatus;

  • @Nullable private ShuffleExternalSorter sorter;

  • private long peakMemoryUsedBytes = 0; =>使用内存的峰值

3 UnsafeShuffleWriter核心实现方法Writer

看看精彩的代码段:

     public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
        // Keep track of success so we know if we encountered an exception
        // We do this rather than a standard try/catch/re-throw to handle
        // generic throwables.
        boolean success = false;
        try {
          while (records.hasNext()) {
          
            insertRecordIntoSorter(records.next());          <=点睛之笔(将mapTask数据写入排序器)
            
          }
          
          
          closeAndWriteOutput();          <=点睛之笔(将mapTask数据持久化到磁盘)
          
          
          success = true;
        } finally {
          if (sorter != null) {
            try {
              sorter.cleanupResources();
            } catch (Exception e) {
              // Only throw this error if we won't be masking another
              // error.
              if (success) {
                throw e;
              } else {
                logger.error("In addition to a failure during writing, we failed during " +
                             "cleanup.", e);
              }
            }
          }
        }
      }
复制代码

4 UnsafeShuffleWriter核心实现方法insertRecordIntoSorter

将mapTask数据写入排序器,实现内存中排序,但是无聚合

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    
    final int partitionId = partitioner.getPartition(key);   <=点睛之笔
    
    serBuffer.reset();
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);

    sorter.insertRecord(                              <=点睛之笔,将serBuffer字节数组写入Tungsten
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
  }
复制代码

5 UnsafeShuffleWriter核心实现方法closeAndWriteOutput

将mapTask数据持久化到磁盘

void closeAndWriteOutput() throws IOException {
    assert(sorter != null);
    updatePeakMemoryUsed();
    serBuffer = null;
    serOutputStream = null;
    final SpillInfo[] spills = sorter.closeAndGetSpills();
    sorter = null;
    final long[] partitionLengths;
    final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    final File tmp = Utils.tempFileWith(output);
    try {
      try {
      
        partitionLengths = mergeSpills(spills, tmp);         <=点睛之笔(合并所有溢出文件为正式Block文件)
        
      } finally {
        for (SpillInfo spill : spills) {
          if (spill.file.exists() && ! spill.file.delete()) {
            logger.error("Error while deleting spill file {}", spill.file.getPath());
          }
        }
      }
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);   <=点睛之笔(写索引)
      
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }
复制代码

5 总结

UnsafeShuffleWriter内部主要使用Tungsten缓存,当然也可能使用JVM内存。和ExternalSortWriter有明显的区别。

秦凯新 于深圳 1:19

关注下面的标签,发现更多相似文章
评论