阅读 82

Spark任务延迟调度及调度池Pool架构剖析-Spark商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

Spark商业环境实战及调优进阶系列

1. 任务延迟调度原理

Spark数据本地化即移动计算而不是移动数据,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动。

2. TaskSetManger重要成员

  • taskSet:TaskSet由当前TaskSetManager管理,一个TaskSet对应一个Stage,一个TaskSet归属于一个TaskSetManager,而TaskSetManager又归属于TaskSetManager Pool来管理。

3. 任务的启动过程

  • 注意 CoarseGrainedSchedulerBackend.makeOffers在任意Executor上变动时,开始调用,makeOffers属于公共方法。

       StatusUpdate
       RegisterExecutor
    复制代码
  • 首先选定一个Executor,选中在指定executor上的任务,以最大优先级分配。

      CoarseGrainedSchedulerBackend.makeOffers(公共方法,任务分配触发点,过滤活的Executor())
                            ||
                            ||
                           \||/
       val tasks = TaskSchedulerImpl.resourceOffer(workOffers) (分配开始)
                            ||
                            ||
                           \||/
      (遍历所有TaskSet内部的Task的优先级,以最大本地性开始分配任务)
      for (currentMaxLocality <- taskSet.myLocalityLevels) 
                            ||
                            ||
                           \||/
          (遍历所有可用的Executor,以指定Executor开始分配)
          for (i <- 0 until shuffledOffers.size)  
                            ||
                            ||
                           \||/
            (选定一个Executor,通过TaskSetManager进行专项任务分配)
             for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {  
                  tasks(i) += task
                  val tid = task.taskId
                  taskIdToTaskSetManager(tid) = taskSet
                  taskIdToExecutorId(tid) = execId
                  executorIdToRunningTaskIds(execId).add(tid)
                  availableCpus(i) -= CPUS_PER_TASK
                  assert(availableCpus(i) >= 0)
                  launchedTask = true
               }
                            ||
                            ||
                           \||/
           (TaskSchedulerImpl引用结束后,返回Tasks,后执行)
           CoarseGrainedSchedulerBackend.launchTasks(taskS)  
    复制代码
  • 通过在CoarseGrainedSchedulerBackend 中的makeOffers方法,通过scheduler的引用,执行TaskSchedulerImpl.resourceOffers 方法,返回taskDescs(包含了所有Task的位置信息和task的算子等),后执行launchTasks,向 executor 发送消息executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

     private def makeOffers() {
     
     val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
     
     val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
     val workOffers = activeExecutors.map {
       case (id, executorData) =>
         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
     }.toIndexedSeq
     
      scheduler.resourceOffers(workOffers)
    复制代码

    } if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }

  • TaskSetManager中resourceOffer内部的是如何分配任务的呢?

  •   -> allowedLocality = getAllowedLocalityLevel(curTime) (延迟调度)
    
      -> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
         speculative)  (返回TaskDescription序列,方便后续发送到Executor)
    复制代码

4 总结

本篇内容还需要完善,并做进一步剖析。

秦凯新 于深圳

关注下面的标签,发现更多相似文章
评论