简单带一下一个spark app的执行。
- 我们的代码通过action算子进行启动,action算子里调用了SparkContext的runJob方法。即
action算子
->SparkContext.runJob
SparkContext.runJob
->DAGScheduler.runJob
->DAGScheduler.submitJob
- 特别细的步骤掠过。根据我们调起action算子的rdd,创建finalStage ->调用DAGScheduler中的
submitStage(finalStage)
- 这中间的步骤是根据rdd之间的依赖关系进行逆着推,找到宽依赖就创建一个新的stage
stage和task的关系
逻辑在DAGScheduler的submitMissingTasks方法。一个stage中的最右rdd的分区数决定了这个stage有几个task。
计算向数据移动,具体逻辑在DAGScheduler的getPreferredLocs。这也是分布式离线计算的灵魂。
// rdd的每一个分区都变成了一个个的Task
partitionsToCompute.map { id =>
// locality information associated with a partition of a particular RDD.
// 计算向数据移动。比如数据源是HDFS,会优先选择block所在的node
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
Task是如何和Executors绑定的呢
具体逻辑在TaskSchedulerImpl的resourceOffers方法(standalone模式下)
摘除部分代码分析
// Offers就是Executor资源。filteredOffers是已经过滤出符合条件的资源
// 然后这shuffleOffers就是打散这些资源,雨露均沾
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// tasks这个就代表了我们所有的tasks,这玩意是一个二维数组,看我画的图
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
tasks:二维数组,代表每个executor身上的cpu core
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// taskSet代表一个stage中的tasks
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
// 任务本地化,先从最优的开始循环,不行就降级
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
// resourceOfferSingleTaskSet这个代码会轮训所有的executor
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
最后所有的task和executors的绑定关系就在tasks这个二维数组中保存。