Spark Shuffle 管理器SortShuffleManager内核原理深入剖析-Spark商业环境实战

1,333 阅读5分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 从ShuffeManager讲起

一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:

  • 目前只有一个实现 SortShuffleManager。
  • SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
  • SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter用于Shuffle的写。
  • SortShuffleManager使用BlockStoreShuffleReader用于Shuffle的读(请详细参照BlockStoreShuffleReader的博客内容)。
  • SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。

官方英文介绍如下:

     * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.

2 SortShuffleManager的进阶

  • 管理基于排序的shuffle(也即输入的记录按照目标分区ID排序数据,这些记录最终会输出一份正式的单独文件到磁盘,一个是写,一个是读)

  • 写的话,举例如:通过SortShuffleWriter,利用其Write()函数,把MapTask的数据经过缓冲区,聚合排序后,写入磁盘。中间过程如溢出,Merge等操作,最终落盘。

  • 读的话,举例如:通过BlockStoreShuffleReader,利用其read()方法,利用ShuffleBlockFetcherIterator来实现数据的迭代读取,通过缓冲区,聚合,排序到内存中,部分会溢出到磁盘。

  • 英文解释,非常精准:

       * In sort-based shuffle, incoming records are sorted according to their target partition ids, then
       * written to a single map output file. Reducers fetch contiguous regions of this file in order to
       * read their portion of the map output. In cases where the map output data is too large to fit in
       * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
       * to produce the final output file.
    
  • numMapsForShuffle :成员变量,shuffle ID 与map任务的数量之间的映射关系。

  • shuffleBlockResolver :IndexShuffleBlockResolver

       * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
       * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
       * The offsets of the data blocks in the data file are stored in a separate index file.
       *
       * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
       * as the filename postfix for data file, and ".index" as the filename postfix for index file.
    

2 SortShuffleManager的主干方法

2.1 registerShuffle

 override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

2.2 getReader

根据map任务的输出的分区数据文件中从startPartition to endPartition-1范围内的数据进行读取的读取器(BlockStoreShuffleReader)

  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

2.2 getWriter

用于根据ShuffleHandle获取ShuffleWriter。

      override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
        val env = SparkEnv.get
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
          
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)                                                          <=点睛之笔
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
          
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)                                                          <=点睛之笔
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
          
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)    <=点睛之笔
        }
      }

3 SortShuffleManager总结

  • 注册Shuffle
  • 获取SHuffle的ShuffleWriter
  • 获取SHuffle的ShuffleRead

秦凯新 于深圳