Spark Shuffle

367

宏观

shuffle不是spark的专属产物,MR里也有。这是大数据分布式计算的逻辑,分布式计算就得这么搞。只不过spark在源码里做了很多能让分布式计算变快的操作!

来聊聊spark shuffle,串一下逻辑:

// spark wordcount
val sc = new SparkContext(conf);
sc.textFile("xxx").flatMap(_.split(" ")).reduceByKey(_+_).foreach(println)

这是我们作为程序员使用spark的代码。在程序跑起来的时候,首先创建了SparkContext,这里面细节就多了去了,并且和shuffle关系很大。当我们调起reduceByKey的时候,底层创建了一个ShuffledRDD,在创建ShuffledRDD的时候,和前一个RDD的依赖关系是ShuffledDependency

ShuffleManager

  1. spark编程首先要new SparkContext(conf)得到SparkContext对象。
  2. SparkContext对象初始化的过程中,创建了SparkEnv,并且在SparkEnv对象初始化的过程中创建了ShuffleManager->SortShuffleManager

3. 源码中说DriverShuffleManager注册Shuffle这个很重要! 可见注册的是ShuffleDependency

ShuffleDependency

wordCount为例:【ARDD】调用reduceByKey的时候通过底层combineByKey创建一个ShuffleRDD【BRDD】,此时这两个RDD的依赖关系就是ShuffleDependency。在创建ShuffleDependency的过程中,向ShuffleManager进行注册。并且返回一个ShuffleHandle

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId,_rdd.partitions.length, this)

根据shuffle的具体细节,返回不同的handle

/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
}

Shuffle写&读

两个RDD之间的关系如果是ShuffleDep,那就划分了两个Stage

  • 上游写
// 上游写的源码
// 获取ShuffleManager
val manager = SparkEnv.get.shuffleManager
// mamager告诉你应该怎么写。有三种写法
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  • 下游读
// 下游读的源码
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
}

可见不管是读还是写都和ShuffleManager打交道。打交道的方式就是当时注册shuffle时返回的handle,并且handle类型和shuffle细节有关系