以前的疑问
MR的编程模型,在Mapper中定义了每条数据的处理逻辑。MapTask把数据写到磁盘,然后reduceTask进行拉取,在Reducer中定义了每组数据的处理逻辑,很容易理解。在Spark中全是RDD,在一个stage中,数据是通过迭代器嵌套,一条一条飞过去的,在这飞的过程中数据经过咱们定义的逻辑。
// wordcount
sc.textFile("xxx").flatMap(_.split(" ")).reduceByKey(_+_).foreach(println)
在reduceByKey之前,是一个stage,后面又是一个stage。我之前一直奇怪聚合的逻辑发生在哪里?
通过扣源码,终于清晰了,如果有mapSideCombine,在shuffle write的时候就完成了。并且下游进行拉取的时候,在shuffle read的时候,排序或者聚合也已经完成了。
RDD是对数据的抽象,他里面不存数据,只定义了计算逻辑。
reader源码分析
除了第一个,也就是最左边的stage,其余的stage的最左边rdd的数据都是来源于
Shuffle Reader
// 从这能看出来,从ShuffleManager获取reader
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
只提供了一个reader这和writer不一样。
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// 只拿自己要计算的部分
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
看reader.read()
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
// 拉取自己要的数据(BlockID, InputStream)
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
val serializerInstance = dep.serializer.newInstance()
// Create a key/value iterator for each stream
// 这步处理完就成了kv的
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
// 整个过程是迭代器嵌套的过程,数据每次迭代经过一下metric
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
// 进一步包装
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
// 这里开始有了分叉
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
// 如果开启了combine
if (dep.mapSideCombine) {
// 上游已经combine了,这拿到的数据已经是聚合过的了。
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
// 对combine后的数据再做combine就是aggregator的第三个函数
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// 就像groupByKey,他的聚合逻辑发生在这里。
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// Sort the output if there is a sort ordering defined.
// 如果有排序的话
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener(_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
resultIter match {
case _: InterruptibleIterator[Product2[K, C]] => resultIter
case _ =>
// Use another interruptible iterator here to support task cancellation as aggregator
// or(and) sorter may have consumed previous interruptible iterator.
new InterruptibleIterator[Product2[K, C]](context, resultIter)
}
}