Spark 源码解读之SparkContext创建过程源码解读 (Yarn Cluster模式)

1,726 阅读6分钟

spark版本 2.2 源码链接:github.com/apache/spar…

  • 简介

    在 YARN-Cluster 模式中,当用户向 YARN 中提交一个应用程序后,YARN 将分两个阶段运行该 应用程序:第一个阶段是把 Spark 的 Driver 作为一个 ApplicationMaster 在 YARN 集群中先启 动;第二个阶段是由 ApplicationMaster 创建应用程序,然后为它向 ResourceManager 申请资 源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。

  • 文字说明

    YARN-cluster 的工作流程分为以下几个步骤:

    1. Spark Yarn Client 向 YARN 中提交应用程序,包括 ApplicationMaster 程序、启动 ApplicationMaster 的命令、需要在 Executor 中运行的程序等;
    2. ResourceManager 收到请求后,在集群中选择一个 NodeManager,为该应用程序分配第 一个 Container,要求它在这个 Container 中启动应用程序的 ApplicationMaster,其中 ApplicationMaster 进行 SparkContext 等的初始化;
    3. ApplicationMasterResourceManager 注册,这样用户可以直接通过 ResourceManage 查 看应用程序的运行状态,然后它将采用轮询的方式通过 RPC 协议为各个任务申请资源,并 监控它们的运行状态直到运行结束;
    4. 一旦 ApplicationMaster 申请到资源(也就是 Container)后,便与对应的 NodeManager 通 信 , 要 求 它 在 获 得 的 Container 中 启 动 启 动 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend 启动后会向 ApplicationMaster 中的 SparkContext 注册并申请 Task。这一点和 Standalone 模式一样,只不过 SparkContextSpark Application 中初始化时, 使用 CoarseGrainedSchedulerBackend 配合 YarnClusterScheduler 进行任务的调度,其中 YarnClusterScheduler 只是对 TaskSchedulerImpl 的一个简单包装;
    5. ApplicationMaster 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend 执行, CoarseGrainedExecutorBackend 运行 Task 并向 ApplicationMaster 汇报运行的状态和进度,以 让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    6. 应用程序运行完成后,ApplicationMasterResourceManager 申请注销并关闭自己。
  • 图解

  • 源码解读

    1. 提交命令

      用户调用${SPARK_HOME}/bin/spark-submit 脚本提交命令,此时会执行SparkSubmit 类的main函数,启动主进程

      exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
      
    2. 解析命令参数

      SparkSubmit 解析脚本参数, 脚本的 master 参数和deployMode 参数,如果是yarnCluster模式则会将接下来需要启动的类childMainClass设置为org.apache.spark.deploy.yarn.Client,并且将用户设置的启动类mainClass作为--class,--jars参数,含义为由先申请一个NodeManange作为ApplicationMaster,并在ApplicationMaster中启动Driver(用户设置的主类)

      org.apache.spark.deploy.SparkSubmit 584-603
      // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
      if (isYarnCluster) {
        childMainClass = "org.apache.spark.deploy.yarn.Client"
        if (args.isPython) {
          childArgs += ("--primary-py-file", args.primaryResource)
          childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
        } else if (args.isR) {
          val mainFile = new Path(args.primaryResource).getName
          childArgs += ("--primary-r-file", mainFile)
          childArgs += ("--class", "org.apache.spark.deploy.RRunner")
        } else {
          if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
            childArgs += ("--jar", args.primaryResource)
          }
          childArgs += ("--class", args.mainClass)
        }
        if (args.childArgs != null) {
          args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
        }
      }
      
    3. 启动org.apache.spark.deploy.yarn.Client

      通过反射运行脚本参数解析后的childMainClass

      org.apache.spark.deploy.SparkSubmit#main
          119:case SparkSubmitAction.SUBMIT => submit(appArgs) 
      ->org.apache.spark.deploy.SparkSubmit:submit
          162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      ->org.apache.spark.deploy.SparkSubmit#runMain
          // 注意这里的childMainClass是org.apache.spark.deploy.yarn.Client,这个Client会向Yarn的RM申请的一个NM,并在NM中启动AM
          // 然后由AM启动用户设置的mainClass参数
          712:mainClass = Utils.classForName(childMainClass)
          739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
          755:mainMethod.invoke(null, childArgs.toArray)
      

      org.apache.spark.deploy.yarn.Client 使用org.apache.hadoop.yarn.client.api.YarnClient向Yarn提交应用程序

      org.apache.spark.deploy.yarn.Client#main
          1150:new Client(args, sparkConf).run()
      ->org.apache.spark.deploy.yarn.Client#run
          1091:this.appId = submitApplication()
      org.apache.spark.deploy.yarn.Client:submitApplication    
          161:val containerContext = createContainerLaunchContext(newAppResponse)
          174:yarnClient.submitApplication(appContext)
          
      其中appContext中封装了启动ApplicationMaster的命令
      org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
          887:val userClass =
                  if (isClusterMode) {
                    Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
                  } else {
                    Nil
                  }
          910:val amClass =
                  if (isClusterMode) {
                    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName
                  } else {
                    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
                  }        
          923: val amArgs =
                  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++
                  Seq("--properties-file", buildPath(Environment.PWD.?(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
              val commands = prefixEnv ++
                  Seq(Environment.JAVA_HOME.?() + "/bin/java", "-server") ++javaOpts ++ amArgs ++
                  Seq(
                    "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
                    "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") 
      
    4. ApplicationMaster&Driver启动

      根据参数觉得是cluster模式还是client模式,如果是cluster模式,则启动用户最初在执行spark-submit脚本设置的mainClass参数,启动用户进程,并等待SparkContext注入(正常情况下,用户进程会示例化SparkContext,SparkContext在实例化过程中通过SchedulerBackend将自己注入到ApplicationMaster中,具体第6小节会介绍)

      org.apache.spark.deploy.yarn.ApplicationMaster#main
          763:master = new ApplicationMaster(amArgs, new YarnRMClient)
          System.exit(master.run())
      ->org.apache.spark.deploy.yarn.ApplicationMaster#run    
          253:if (isClusterMode) {
                runDriver(securityMgr)
              } else {
                runExecutorLauncher(securityMgr)
              }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 启动用户进程
          394:userClassThread = startUserApplication()
          // 等待SparkContext注入
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))  
      ->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
          629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
          635:mainMethod.invoke(null, userArgs.toArray)
      
    5. 创建SparkContext

      至此,已经完成了向RM申请第一个NM,并在其上启动ApplicationMaster和Driver进程的工作,接下来会运行用户设置的mainClass,当运行到new SparkContext的时候会创建SparkContext实例,在该过程中会创建两个特别重要的对象taskScheduler和schedulerBackend,这两个对象协调配合将DAGScheduler创建的task集提交到Executor运行,并实时申请任务资源,监听task运行状态

      用户程序中执行new SparkContext()命令
      org.apache.spark.SparkContext#new (371-583try-cache代码段)
          397 // 设置jar包等基本信息
              _conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS))
              _conf.setIfMissing("spark.driver.port", "0")
              _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
              _jars = Utils.getUserJars(_conf)
              _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
          428 // 创建LisenterBus 用户SparkUI的渲染
              _jobProgressListener = new JobProgressListener(_conf)
              listenerBus.addListener(jobProgressListener)
          432 // 创建SparkEnv
              _env = createSparkEnv(_conf, isLocal, listenerBus)
              SparkEnv.set(_env)
          501 // 创建taskScheduler和schedulerBackend
              val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
              _schedulerBackend = sched
              _taskScheduler = ts
              _dagScheduler = new DAGScheduler(this)
              _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
          540 // 创建动态资源分配管理器,具体会在第9小节介绍
              _executorAllocationManager =
                if (dynamicAllocationEnabled) {
                  schedulerBackend match {
                    case b: ExecutorAllocationClient =>
                      Some(new ExecutorAllocationManager(
                        schedulerBackend.asInstanceOf[ExecutorAllocationClient],     listenerBus, _conf))
                    case _ =>
                      None
                  }
                } else {
                  None
                }
              _executorAllocationManager.foreach(_.start())
      ->org.apache.spark.SparkContext#createTaskScheduler
          2757 // 这里如果是yarn模式 masterUrl是yarn,对应的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
              case masterUrl =>
              val cm = getClusterManager(masterUrl) match {
                  case Some(clusterMgr) => clusterMgr
                  case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
              }
              try {
                // 对应的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler
                val scheduler = cm.createTaskScheduler(sc, masterUrl)
                // 对应的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
                val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                cm.initialize(scheduler, backend)
                (backend, scheduler)
              } catch {
                case se: SparkException => throw se
                case NonFatal(e) =>
                  throw new SparkException("External scheduler cannot be     instantiated", e)
              }
      
    6. 创建YarnClusterScheduler

      YarnClusterScheduler 主要是通过YarnScheduler继承了TaskSchedulerImpl(spark的主要实现),这里子类最重要的功能是将SparkContext注入到ApplicationMaster中(承接第五小节)

      org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler
          32: 通过不同的模式启动不同的子类
              override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
                  sc.deployMode match {
                    case "cluster" => new YarnClusterScheduler(sc)
                    case "client" => new YarnScheduler(sc)
                    case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
                  }
              }
      org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook
              private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
              
               logInfo("Created YarnClusterScheduler")
      
               override def postStartHook() {
                 
                 ApplicationMaster.sparkContextInitialized(sc)
                 super.postStartHook()
                 logInfo("YarnClusterScheduler.postStartHook done")
               }
      
              }
      
    7. 创建 YarnClusterSchedulerBackend

      YarnClusterSchedulerBackend 也是通过继承YarnSchedulerBackend继承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark实现的主类),这里子类最主要的功能是实现Spark向Yarn申请资源的桥接功能(第9小节详细介绍)和创建clientEndPoint和driverEndPoint用于通信

      org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend
          // 根据不同的模式实例化不同的子类
          40:override def createSchedulerBackend(sc: SparkContext,
            masterURL: String,
            scheduler: TaskScheduler): SchedulerBackend = {
              sc.deployMode match {
                case "cluster" =>
                  new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case "client" =>
                  new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case  _ =>
                  throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn")
              }
          }
          
      -> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start
          36:super.start()
      -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start
          // 创建clientEndpoint
          52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
              -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive
                  case RegisterClusterManager(am) =>
                      logInfo(s"ApplicationMaster registered as $am")
                      // 这里注入amEndpoint,注入时机是第8小节中ApplicationMaster被注入SparkContext后
                      amEndpoint = Option(am)
                      if (!shouldResetOnAmRegister) {
                        shouldResetOnAmRegister = true
                      } else {
                        // AM is already registered before, this potentially means that      AM   failed and
                        // a new one registered after the failure. This will only    happen in   yarn-client mode.
                        reset()
                      }
          86:super.start()
          -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start
              // 创建driverEndpoint
              378:driverEndpoint = createDriverEndpointRef(properties)
      
    8. SparkContext注入ApplicationMaster后的后续操作

      第6小节最后,通过ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,然后会激活第4小节ApplicationMaster的等待

      org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          769:master.sparkContextInitialized(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          326:sparkContextPromise.success(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 此时sc取到值,开始进行下一步操作
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))
          if (sc != null) {
              rpcEnv = sc.env.rpcEnv
              // 这里创建AMEndpoint,并向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,对应第7小节
              val driverRef = runAMEndpoint(
                sc.getConf.get("spark.driver.host"),
                sc.getConf.get("spark.driver.port"),
                isClusterMode = true)
              registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint
          387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
          681:override def onStart(): Unit = {
              driver.send(RegisterClusterManager(self))
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
          // 创建org.apache.spark.deploy.yarn.YarnAllocator
          // 这个类YarnAllocator负责从YARN ResourceManager请求容器,并确定当YARN满足某些请求时如何处理容器
          359:allocator = client.register(driverUrl,
                  driverRef,
                  yarnConf,
                  _sparkConf,
                  uiAddress,
                  historyAddress,
                  securityMgr,
                  localResources)
          // 初始化申请资源
          368:allocator.allocateResources() 
          // 创建监听线程,在任务运行过程中按需申请资源,线程内也会调用allocator.allocateResources()
          369:reporterThread = launchReporterThread()
      
    9. 申请资源

      申请资源主要由org.apache.spark.deploy.yarn.YarnAllocator 完成,由 org.apache.spark.scheduler.cluster.YarnSchedulerBackend 触发资源的申请,触发方式为使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors消息通信来更新Executor需求数targetNumExecutors

      // 主要通过调用这个方法来申请NodeManager,并启动Executor
      org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
          // 先通过这个方法获取需要申请的资源数
          260:updateResourceRequests()
              -> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
                  297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
                  //在初始化的时候,targetNumExecutors为
                  //  spark.dynamicAllocation.minExecutors
                  //  spark.dynamicAllocation.maxExecutors
                  //  spark.dynamicAllocation.initialExecutors 的最大值
                  109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
                  275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
                  
              -> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors
                  2538:val initialExecutors = Seq(
                      conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
                      conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
                      conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
                      
                      
          276:handleAllocatedContainers(allocatedContainers.asScala)
              -> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
                  442:runAllocatedContainers(containersToUse)
              -> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
                  511:new ExecutorRunnable(
                            Some(container),
                            conf,
                            sparkConf,
                            driverUrl,
                            executorId,
                            executorHostname,
                            executorMemory,
                            executorCores,
                            appAttemptId.getApplicationId.toString,
                            securityMgr,
                            localResources
                          ).run()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#run
                  65:startContainer()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
                  // nodeManager上需要运行的命令
                  98:val commands = prepareCommand()
                  // 启动nodeManager
                  122:nmClient.startContainer(container.get, ctx)
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
                  201:val commands = prefixEnv ++
                      Seq(Environment.JAVA_HOME.?() + "/bin/java", "-server") ++
                      javaOpts ++
                      // 这句话是重点,说明会启动这个类,具体在第10小节介绍
                      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
                        "--driver-url", masterAddress,
                        "--executor-id", executorId,
                        "--hostname", hostname,
                        "--cores", executorCores.toString,
                        "--app-id", appId) ++
                      userClassPath ++
                      Seq(
                        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
                        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
                        
      

      在第5小节创建SparkContext的时候有提到创建executorAllocationManager动态资源申请管理器,这个就是当spark.dynamicAllocation.enabled 设置为true的时候会生效,并在任务运行时动态申请资源

      org.apache.spark.ExecutorAllocationManager 通过调用org.apache.spark.ExecutorAllocationClientrequestTotalExecutors方法来申请资源

      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend 实现了这个接口

      该类又通过调用子类org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend的doRequestTotalExecutors方法最终实现资源的申请

      org.apache.spark.SparkContext#new
          552:_executorAllocationManager.foreach(_.start())
      -> org.apache.spark.ExecutorAllocationManager#start
          222:val scheduleTask = new Runnable() {
                override def run(): Unit = {
                  try {
                    schedule()
                  } catch {
                    case ct: ControlThrowable =>
                      throw ct
                    case t: Throwable =>
                      logWarning(s"Uncaught exception in thread  ${Thread.currentThread().getName}", t)
                  }
                }
              }
              // 创建一个定时任务类来调度资源
              executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
      -> org.apache.spark.ExecutorAllocationManager#schedule
          281: updateAndSyncNumExecutorsTarget(now)
      -> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
          // maxNumExecutorsNeeded 通过
          // listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行触发更新)
          // listener.totalRunningTasks(在DAGScheduler的996行触发更新)
          // spark.executor.cores (用户自定义,默认1)计算而来,具体计算&更新方法待补充
          310: val maxNeeded = maxNumExecutorsNeeded
          331: val delta = addExecutors(maxNeeded)
          380: val addRequestAcknowledged = testing ||
        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
          
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
          548: doRequestTotalExecutors(numExecutors)
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
          // 构建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,并使用clientEndpoint发送
          138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply    
          // 最终将RequestExecutors 消息转发给ApplicationMaster
          280:case r: RequestExecutors =>
                 amEndpoint match {
                   case Some(am) =>
                     am.ask[Boolean](r).andThen {
                       case Success(b) => context.reply(b)
                       case Failure(NonFatal(e)) =>
                         logError(s"Sending $r to AM was unsuccessful", e)
                         context.sendFailure(e)
                     }(ThreadUtils.sameThread)
                   case None =>
                     logWarning("Attempted to request executors before the AM has registered!")
                     context.reply(false)
                 }
      ->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
          696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
                r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)
      -> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
          // 最终修改了targetNumExecutors,这样当调用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的时候就会拿到不一样的targetNumExecutors,而requestedTotal的源头是 org.apache.spark.ExecutorAllocationManager#schedule计算出来的
          218: targetNumExecutors = requestedTotal
      
      DAGScheduler 维护PendingTask
      org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#totalPendingTasks 722
          org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#stageIdToNumTasks 583
              org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#onStageSubmitted 595
      	        org.apache.spark.scheduler.SparkListenerBus#doPostEvent 33
      		        org.apache.spark.scheduler.DAGScheduler#submitMissingTasks 986
      
      
      
      CoarseGrainedSchedulerBackend 维护RunningTask
      org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#totalRunningTasks 731
          org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#numRunningTasks 587
              org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#onTaskStart 652
      	        org.apache.spark.scheduler.SparkListenerInterface#onTaskStart 41
      		        org.apache.spark.scheduler.DAGScheduler#handleBeginEvent 807
      			        org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive 1705
      				org.apache.spark.scheduler.DAGScheduler#taskStarted 206
      					org.apache.spark.scheduler.TaskSetManager#resourceOffer 498
      						org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet 295
      							org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers 375
      								org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers 261
      
      
      ExecutorAllocationManager 动态计算需要的Executor    
      org.apache.spark.ExecutorAllocationManager#schedule 278
         org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget 310
            org.apache.spark.ExecutorAllocationManager#maxNumExecutorsNeeded 264			
      

      小疑问,org.apache.spark.SparkContext#requestExecutors方法也可以通过调用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors进而申请资源,且该方法里的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")日志可以在Spark Driver日志里找到,但不知道谁调用但这个方法

    10. org.apache.spark.executor.CoarseGrainedExecutorBackend 类

      第8小节中,申请到资源后,在NodeManager中会启动org.apache.spark.executor.CoarseGrainedExecutorBackend 进程,该进程和org.apache.spark.executor.CoarseGrainedScheduleBackend是多对一关系,从名字是可以看成CoarseGrainedExecutorBackend是Executor的管理器,它会持有一个Executor对象,Executor会通过启动一个线程池来运行Task,这样整个流程就串起来了

      org.apache.spark.executor.CoarseGrainedExecutorBackend#main
          284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#run
          226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
          env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
          // 向CoarseGrainedSchedulerBackend注册自己
          63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply  
          // 检测没有问题,通知CoarseGrainedExecutorBackend创建Executor
          195: executorRef.send(RegisteredExecutor)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
          83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      -> org.apache.spark.executor.Executor#new
          101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      
    11. 至此SparkContext创建流程已经全部完毕,SparkContext创建成功后,就会开始执行用户代码,先构建RDD的逻辑关系图,然后遇到action会切分stage形成物理执行图,然后通过SparkContext.runJob执行,具体分析可见 github.com/JerryLead/S…