前序
在Spark的历史版本中,对于Shuffle Manager有两种实现。在1.2版本之前的Hash Base Shuffler,以及从1.2版本开始后的基于Sort Base Shuffler。至于Hash Base Shuffler,目前以及被移除,也不是本文重点。本文主要介绍基于Sort Base Shuffler的3中Shuffler Write的选择策略,以及SortShuffleManager的部分源代码分析。
ShufflerManager初始化
在SparkEnv中,进行了ShfflerManager的初始化,其源代码如下:
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
如源码所示,在获取配置参数spark.shuffle.manager(默认为sort)后,其实无论该参数为sort还是tungsten-sort其选择的ShufflerManager均为SortShuffleManager。
ShuffleManager
在介绍SortShuffleManager类之前,由于该类继承自ShuffleManager,因此先介绍下ShuffleManager接口。该接口提供了6个基本的方法,如下所示。
1.registerShuffle
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
根据输入参数,返回对应的shuffleHandle类。具体参数如下:
shuffleId:即当前shuffle id。
numMaps:父Rdd的分区数。
dependency:宽依赖,shuffle所依赖的信息,包含父Rdd,分区函数,排序函数,聚合函数,是否需要map端聚合等信息。更详细见ShuffleDependency类。
2.getWriter
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
根据输入参数,返回对应的ShuffleWriter类。具体参数如下:
handle:对应的shuffleHandle。
mapId:partition id。
context:TaskContext,2.4之前实现类为TaskContextImpl,2.4之后新增了BarrierTaskContext。
metrics:spark指标监控。
3.getReader
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
由reduce task调用,根据startPartition至endPartition-1(包含)读取该reduce task分区对应的数据。
4.unregisterShuffle
def unregisterShuffle(shuffleId: Int): Boolean
移除shuffleId对应的ShuffleManger。
5.shuffleBlockResolver
def shuffleBlockResolver: ShuffleBlockResolver
ShuffleBlockResolver用于实现shuffle的数据块与物理文件位置的映射。目前唯一实现为IndexShuffleBlockResolver类,该类对于同一个map任务所生成的shuffle文件,将存储在同一个文件中,对于每一个reduce需要读取偏移量存储在对应的index文件中。其中.data后缀文件为数据文件,.index后缀为对应的偏移量的索引文件。详情可以查看IndexShuffleBlockResolver类。
6.stop
def stop(): Unit
在2.0(这个版本号不一定正确)之前shuffleManger是有两个实现类的,一个HashShuffleManager,另一个为SortShuffleManager,对应两种shuffle实现(对于具体的HashShuffle实现可以自行搜索)。在之后,只存在SortShuffleManager一个实现类也是默认的实现了类。
SortShuffleManager
SortShuffleManager目前为ShuffleManager的唯一实现,重点看一下参数设置,registerShuffle实现、getWriter实现和getReader实现。
1.shuffleManager参数
spark.shuffle.manager参数用于设置shuffle的方式,默认为sort。对应的解析方法在SparkEnv的create方法中,如下:
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
2.registerShuffle实现
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
- 返回BypassMergeSortShuffleHandle条件是:
- 不需要map端聚合;
- reduce分区数小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)。具体参见SortShuffleWriter.shouldBypassMergeSort(conf, dependency)方法。
- 返回SerializedShuffleHandle条件是:
- 序列化器支持relocation特性,即在序列化流输出中重新排序序列化对象的字节相当于在序列化它们之前重新排序这些元素。直接对序列化的数据进行排序。详情可见Serializer类的supportsRelocationOfSerializedObjects方法。Spark中Serializer的实现类JavaSerializer是不支持此特性的。KryoSerializer在开启了auto-reset之后是支持该功能的,详情可见KryoSerializer类的getAutoReset方法;(spark.serializer参数用于设置序列化方式,默认为org.apache.spark.serializer.JavaSerializer)
- 不需要map端聚合;
- reduce分区数小于16777216;
- BaseShuffleHandle:如果都不满足以上两个,那么就返回BaseShuffleHandle。
3.getWriter实现
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
env.conf,
metrics)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
getWrite方法是根据ShuffleHandle的类型来返回具体的ShuffleWrite实现的,因此会使用对应的ShuffleWrite的条件也是对应ShuffleHandle的条件。如下:
- UnsafeShuffleWriter:对应ShuffleHandle类型为SerializedShuffleHandle。
- BypassMergeSortShuffleWriter:对应ShuffleHandle类型为BypassMergeSortShuffleHandle。
- SortShuffleWriter:对应对应ShuffleHandle类型为BaseShuffleHandle。
4.getReader实现
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition, endPartition, context, metrics)
}
ShuffleReader目前只有一个实现类BlockStoreShuffleReader,即返回该类实例。