Spark-Task如何和Executor绑定的

1,239 阅读2分钟

简单带一下一个spark app的执行。

  1. 我们的代码通过action算子进行启动,action算子里调用了SparkContext的runJob方法。即action算子->SparkContext.runJob
  2. SparkContext.runJob->DAGScheduler.runJob->DAGScheduler.submitJob
  3. 特别细的步骤掠过。根据我们调起action算子的rdd,创建finalStage ->调用DAGScheduler中的submitStage(finalStage)
  4. 这中间的步骤是根据rdd之间的依赖关系进行逆着推,找到宽依赖就创建一个新的stage

5. 找到最左边没有父stage的时候就开始执行task相关的内容

stage和task的关系

逻辑在DAGScheduler的submitMissingTasks方法。一个stage中的最右rdd的分区数决定了这个stage有几个task。

计算向数据移动,具体逻辑在DAGScheduler的getPreferredLocs。这也是分布式离线计算的灵魂。

// rdd的每一个分区都变成了一个个的Task
  partitionsToCompute.map { id =>
  // locality information associated with a partition of a particular RDD.
  // 计算向数据移动。比如数据源是HDFS,会优先选择block所在的node
    val locs = taskIdToLocations(id)
    val part = partitions(id)
    stage.pendingPartitions += id
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
      taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
      Option(sc.applicationId), sc.applicationAttemptId)
  }

Task是如何和Executors绑定的呢

具体逻辑在TaskSchedulerImpl的resourceOffers方法(standalone模式下)

摘除部分代码分析

// Offers就是Executor资源。filteredOffers是已经过滤出符合条件的资源
// 然后这shuffleOffers就是打散这些资源,雨露均沾
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// tasks这个就代表了我们所有的tasks,这玩意是一个二维数组,看我画的图
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray

tasks:二维数组,代表每个executor身上的cpu core

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // taskSet代表一个stage中的tasks
    for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      // 任务本地化,先从最优的开始循环,不行就降级
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        do {
        // resourceOfferSingleTaskSet这个代码会轮训所有的executor
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

最后所有的task和executors的绑定关系就在tasks这个二维数组中保存。