从源码剖析一个 Spark WordCount Job 执行的全过程

905 阅读12分钟

从源码剖析一个 Spark WordCount Job 执行的全过程 | Mz's Blog

WordCount 可以说是分布式数据处理框架的”Hello World”,我们可以以它为例来剖析一个 Spark Job 的执行全过程。

我们要执行的代码为:

sc.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

只有一行,很简单也很经典的代码。这里的collect作为一个 action,将触发一个 Job,现在我们从源码开始剖析这个 Job 执行的全部过程。我这次读的源码是 Spark 1.4.1 的 release 版本。

为了方便描述,我们把上面的代码先进行一下拆分,这样可以清晰的看到每一步生成的 RDD 及其依赖关系,并方便下面分析时进行引用:

val hadoopRDD0 = sc.textFile("hdfs://...")                // HadoopRDD[0]
val mapPartitionsRDD1 = hadoopRDD0.flatMap(_.split(" "))  // MapPartitionsRDD[2]
val mapPartitionsRDD2 = mapPartitionsRDD1.map((_, 1))     // MapPartitionsRDD[2]
val shuffledRDD3 = mapPartitionsRDD2.reduceByKey(_+_)     // ShuffledRDD[3]
shuffledRDD3.collect                                      // action

collect 触发 Job

首先,collect 调用了 SparkContext 上的 runJob 方法。这个方法是一个阻塞方法,会在 Job 完成之前一直阻塞等待,直到 Job 执行完成之后返回所得的结果:

RDD.collect

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

需要注意的是这里传入了一个函数,这个函数就是这个 Job 的要执行的任务。后面我们可以看到,它将会被包装并序列化后发送到要执行它的 executor 上,并在要处理的 RDD 上的每个分区上被调用执行。

DAGScheduler 提交 Job

SparkContext 的 runJob 被调用之后,这个 Job 的信息被递传给了 SparkContext 持有的一个 DAGScheduler 上。DAGScheduler 本身维护着一个消息队列,在收到这个 Job 之后,将给自己的消息队列发送一个 JobSubmitted 消息。这个消息中包含了新生成的一个 JobId, 触发 action 的 RDD,经过清理后的闭包函数,要处理的各个分区的在 RDD 中的索引,以及一些其他信息。

DAGScheduler 的消息队列在收到 JobSubmitted 消息后,将触发调用 handleJobSubmitted 方法。在这个方法中,首先会根据这个触发 action 的 RDD 的依赖信息计算出这个 Job 的所有 Stage。在这个 WordCount 中,我们是在 reduceByKey 生成的 shuffledRDD3(其生成的过程涉及到通用的 combineByKey 方法,具体可以参考这篇文章)上触发的 action,所以我们的 ResultStage 所对应的 finalRDD 就是 shuffledRDD3,ResultStage 所要执行的就是 shuffledRDD3 的所有分区。shuffledRDD3 有一个 ShuffleDependency,指向 mapPartitionsRDD2,据此 ShuffleDependency 会生成一个 ShuffleMapStage,它是 ResultStage 的父 Stage。

根据继承关系分析 Stages

在分析出所有的 Stage 之后,DAGScheduler 会根据 ResultStage 创建出一个 ActiveJob 对象,用来表示这个活跃的 Job。然后提交 ResultStage,但是在真正执行这个 Stage 之前,先递归的判断它有没有父 Stage,若有的话先提交它的父 Stage,并将当前 Stage 加入等待队列;若没有父 Stage,才会真正的开始执行这个 Stage。等待队列中的 Stage,会在父 Stage 都执行完成之后再被执行。

由此可以看出,在一个 Job 中,Stage 之间必须按序执行,后一个 Stage 的执行将依赖前一个 Stage 的结果。一个 Job 只会有一个 ResultStage,并且这个 ResultStage 一定会是整个 Job 的最后一个 Stage,所以 ResultStage 执行的结束也就标志着整个 Job 的结束。

Task 的创建和提交

按照之前的分析,我们的 Job 一共有两个 Stage,一个 ShuffleMapStage,一个 ResultStage,并将先执行 ShuffleMapStage。在执行 Stage 的时候,会按此 Stage 对应的 RDD 的分区数量,对应每一个分区创建一个 Task。如果是 ShuffleMapStage 则创建 ShuffleMapTask,如果是 ResultStage 则创建 ResultTask。这些 Task 在后面将会被序列化后发到其他的 executor 上面去运行。

在这里分析一下每个 Task 包含哪些信息 两种 Task 都会包含的信息有 (1) 当前 Stage 对应的 RDD 对象(轻量级) (2) 当前 Stage 的 ID (3) 要处理的那个分区信息(轻量级),以及该任务可能的最优执行位置(例如,对于 hdfs 上的文件,HadoopRDD 中会记录其每一个分区存储在集群的位置,并将这个位置通过依赖继承到其子 RDD)

除此之外,ShuffleMapTask 还包含了对应的 ShuffleDependency 的对象(这其中实际上有分区的方法,数据合并的方法等计算时所需的信息);ResultTask 还包含了当前这个 Job 最终要执行在每个数据上的函数(在此情况下就是 collect 传给 SparkContext 的那个函数)。

在对每个要处理的分区创建出各个 Task 之后,DAGScheduler 会将同一个 Stage 的各个 Task 合并成一个 TaskSet,并将其提交给 TaskScheduler。至此,调度这些 Task 的工作就交给了 TaskScheduler 来进行。

TaskScheduler 在收到这个 TaskSet 之后,首先为其创建一个 TaskSetManager,这个 TaskSetManager 将辅助任务的调度。然后 TaskScheduler 将会调用 SchedulerBackend 上的 reviveOffers 方法去申请可用的资源。

SchedulerBackend 分配资源 (executors) 和发送 Task

SchedulerBackend 是一个接口,它在不同的部署模式下会有不同的实现(实际上 TaskScheduler 也是这样)。SchedulerBackend 的作用是调度和控制整个集群里面的资源(我是这么理解的,这里的资源指的是可用的 executors),当 reviveOffers 方法被调用后,它会将当前可用的所有资源信息,通过调用 TaskScheduler 的 resourceOffers 提供给 TaskScheduler(实际上这个过程是通过另一个 EndPoint 类以消息队列的方式实现的,这样可以保证同时只会进行一个对资源的申请或释放过程)。

TaskScheduler 在收到当前所有可用的资源信息后,会将这些资源信息按序提供给当前正在执行的多个 TaskSet,每个 TaskSet 再根据这些资源信息将当前可以执行的 Task 序列化后包装到一个 TaskDescription 对象中返回(这个 TaskDescription 对象中也包含了这个任务将要运行在哪个 executor 上),最终通过 TaskScheduler 将所有当前的资源情况可以执行的 Task 对应的 TaskDescription 返回给 SchedulerBackend。

SchedulerBackend 这时才根据每个 TaskDescription 将 executors 资源真正的分配给这些 Task,并记录已分配掉的资源和剩余的资源,然后将 TaskDescription 中序列化后的 Task 通过网络(Spark 使用 akka 框架)发送给它对应的 executor。

executor 执行 Task

集群中的 executor 在收到 Task 后,申请一个线程开始运行这个 Task。这是整个 Job 中最核心的部分了,真正的计算都在这一步发生。首先将其反序列化,然后调用这个 Task 对象上的 runTask 方法。在这里对于 ShuffleMapTask 和 ResultTask,runTask 方法有着不同的实现,并将返回不同的内容。我们分别来分别分析。

对于 ShuffleMapTask,runTask 首先获取对应的 RDD 和 ShuffleDependency。在这里对应的 RDD 是 mapPartitionsRDD2,ShuffleDependency 中则有着合并的计算信息。然后调用 RDD 的 iterator 方法获取一个对应分区数据的迭代器。如果当前 RDD 分区的数据已经在之前计算过了,则会直接去内存或磁盘中获取,否则在此时就会调用 mapPartitionsRDD2 的 compute 方法,根据其依赖去计算它的分区数据。如果 ShuffleDependency 中的 mapSideCombine 标记为 true,就会将 iterator 方法返回的分区数据在这里(也就是 map 端)进行合并(此时要求 ShuffleDependency 中的 aggregator 不为空,aggregator 中包含了如何将数据进行合并的信息)。然后根据 ShuffleDependency 中的 partitioner(默认是一个 HashPartitioner)计算出每条数据在其结果端(就是 shuffleRDD3 中)的分区,并将其写入到本地磁盘中对应的文件中去(在这里写入方法有多种实现方式,1.4.1 的版本默认是用了 SortShuffleManager,还有的其他实现是 HashShuffleManager 和 UnsafeShuffleManager,具体的实现方法在此处就不详说了)。当分区的每条数据都处理完后,runTask 会返回一个 MapStatus,这其中包含了一个 BlockManagerId(标记了这个任务被执行的位置,也就是 Map 后的数据存储的位置)以及每个结果分区(每个 reduceId)的数据的大小信息。最后这个 MapStatus 将通过网络发回给 driver,dirver 将其记录。

ShuffleMapTask.runTask

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

  metrics = Some(context.taskMetrics)
  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    return writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}

对于 ResultTask,runTask 首先也是获取对应的 RDD 和要在数据上执行的函数 func。在这里对应的 RDD 应该是 shuffleRDD3,然后调用 RDD 上的 iterator 获取这个分区的数据,并将其传入 func 函数中,将 func 函数的返回值作为 runTask 的返回值返回。过程看似简单,实际上在 shuffleRDD3 上调用 iterator 时就对应了 shuffle 的 reduce 端的合并。从 shuffleRDD3 的 compute 方法的实现可以看出,它的每个分区的数据都要去执行了 ShuffleMapTask 的 executor 上面获取,所以会产生大量的网络流量和磁盘 IO。这个过程就是 MapReduce 范式中的 shuffle 过程,这里面还有很多的细节我并没有详述,但是这个过程十分关键,它的实现效率直接决定了分布式大数据处理的效率。

ResultTask.runTask

override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

  metrics = Some(context.taskMetrics)
  func(context, rdd.iterator(partition, context))
}

executor 返回结果

在 runTask 计算结束返回数据后,executor 将其返回的数据进行序列化,然后根据序列化后数据的大小进行判断:如果数据大与某个值,就将其写入本地的内存或磁盘(如果内存不够),然后将数据的位置 blockId 和数据大小封装到一个 IndirectTaskResult 中,并将其序列化;如果数据不是很大,则直接将其封装入一个 DirectTaskResult 并进行序列化。最终将序列化后的 DirectTaskResult 或者 IndirectTaskResult 递传给 executor 上运行的一个 ExecutorBackend 上(通过 statusUpdate 方法)。

ExecutorBackend 如上面的 SchedulerBackend 有着相似的功能(实际上,对于 local 模式,这两个类都由一个 LocalBackend 实现),将结果封入一个 StatusUpdate 消息透传给一个对应的 EndPoint 类,EndPoint 类中收到这个消息后将该消息再通过网络发送给 driver。

driver 接收 executor 返回的结果并释放资源

在 driver 端的 SchedulerBackend 收到这个 StatusUpdate 消息之后,将结果续传给 TaskScheduler,并进行资源的释放,在释放资源后再调用一次 reviveOffers,这样又可以重复上面所描述的过程,将释放出来的资源安排给其他的 Task 来执行。

TaskResultGetter 解析并拉取结果

TaskScheduler 在收到任务结果后,将这个任务标记为结束,然后使用一个 TaskResultGetter 类来进行结果的解析。TaskResultGetter 将结果反序列化,判断如果其是一个 DirectTaskResult 则直接抽取出其中的结果;如果是一个 IndirectTaskResult 则需要根据其中的 blockId 信息去对应的机器上拉取结果。最终都是将结果拉取到 driver 的内存中(这就是我们最好不要在大数据集上执行类似 collect 的方法的原因,它会将所有的数据拉入 driver 的内存中,造成大量的内存开销,甚至内存不足)。然后 TaskResultGetter 会将拉取到的结果递交给 TaskScheduler,TaskScheduler 再将此结果递交给 DAGScheduler。

处理结果并在 Job 完成时返回

DAGScheduler 在收到 Task 完成的消息后,先判断这完成的是一个什么任务。如果是一个 ShuffleMapTask 则需要将返回的结果(MapStatus)记录到 driver 中,并判断如果当前的 ShuffleMapStage 若是已经完成,则去提交下一个 Stage。如果是一个 ResultTask 完成了, 则将其结果递交给 JobWaiter,并标记这个任务以完成。

JobWaiter 是 DAGScheduler 在最开始 submitJob 的时候创建的一个对象,用于阻塞等待任务的完成,并进行结果的处理。JobWaiter 在每收到一个 ResultTask 的结果时,都将结果在 resultHandler 上执行。这个 resultHandler 则是由 SparkContext 传进来的一个函数,其作用是将数据放入一个数组中,这个数组最终将作为 SparkContext.runJob 方法的返回值,被最开始的 collect 方法接收然后返回。若 JobWaiter 收到了每个 ResultTask 的结果,则表示整个 Job 已经完成,此时就停止阻塞等待,于是 SparkContext.runJob 返回一个结果的数组,并由 collect 接收后返回给用户程序。

至此,一个 Spark 的 WordCount 执行结束。

总结

本文从源码的角度详细分析了一个 Spark Job 的整个执行、调度的过程,不过很多东西还只是浅尝辄止,并未完全深入。尽管如此,经过连续好几天的分析,我还是觉得收获颇丰,对 Spark 的实现原理有了更加深入的理解,甚至对 MapReduce 的编程范式以及其 shuffle 过程也增加了不少理解。PS:其实从一开始我到分析结束都是没有做任何记录的,只因为一直一知半解实在不知道如何来做记录,所以只是去查阅一些资料和使劲儿的阅读源码。在我自认为分析结束后,我才开始写这篇记录,但是在写的过程中我才发现我分析的过程有一些并不是很清晰,然后重新去看,才真正弄的比较清晰了。可见写博文是很重要的过程,不仅是将学到的知识分享出来,而且对自身的知识也有很好的加固作用。

参考文件

从源码剖析一个Spark WordCount Job执行的全过程