spark版本 2.2 源码链接:github.com/apache/spar…
-
简介
在 YARN-Cluster 模式中,当用户向 YARN 中提交一个应用程序后,YARN 将分两个阶段运行该 应用程序:第一个阶段是把 Spark 的 Driver 作为一个 ApplicationMaster 在 YARN 集群中先启 动;第二个阶段是由 ApplicationMaster 创建应用程序,然后为它向 ResourceManager 申请资 源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。
-
文字说明
YARN-cluster 的工作流程分为以下几个步骤:
Spark Yarn Client
向 YARN 中提交应用程序,包括ApplicationMaster
程序、启动ApplicationMaster
的命令、需要在Executor
中运行的程序等;ResourceManager
收到请求后,在集群中选择一个NodeManager
,为该应用程序分配第 一个Container
,要求它在这个Container
中启动应用程序的ApplicationMaster
,其中ApplicationMaster
进行SparkContext
等的初始化;ApplicationMaster
向ResourceManager
注册,这样用户可以直接通过ResourceManage
查 看应用程序的运行状态,然后它将采用轮询的方式通过 RPC 协议为各个任务申请资源,并 监控它们的运行状态直到运行结束;- 一旦
ApplicationMaster
申请到资源(也就是Container
)后,便与对应的NodeManager
通 信 , 要 求 它 在 获 得 的Container
中 启 动 启 动CoarseGrainedExecutorBackend
,CoarseGrainedExecutorBackend
启动后会向ApplicationMaster
中的SparkContext
注册并申请 Task。这一点和 Standalone 模式一样,只不过SparkContext
在Spark Application
中初始化时, 使用CoarseGrainedSchedulerBackend
配合YarnClusterScheduler
进行任务的调度,其中YarnClusterScheduler
只是对TaskSchedulerImpl
的一个简单包装; ApplicationMaster
中的SparkContext
分配 Task 给CoarseGrainedExecutorBackend
执行,CoarseGrainedExecutorBackend
运行 Task 并向ApplicationMaster
汇报运行的状态和进度,以 让ApplicationMaster
随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;- 应用程序运行完成后,
ApplicationMaster
向ResourceManager
申请注销并关闭自己。
-
图解
-
源码解读
-
提交命令
用户调用
${SPARK_HOME}/bin/spark-submit
脚本提交命令,此时会执行SparkSubmit
类的main函数
,启动主进程exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
-
解析命令参数
SparkSubmit 解析脚本参数, 脚本的 master 参数和deployMode 参数,如果是yarnCluster模式则会将接下来需要启动的类
childMainClass
设置为org.apache.spark.deploy.yarn.Client
,并且将用户设置的启动类mainClass
作为--class,--jars参数
,含义为由先申请一个NodeManange作为ApplicationMaster,并在ApplicationMaster中启动Driver(用户设置的主类)org.apache.spark.deploy.SparkSubmit 584-603 // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { if (args.primaryResource != SparkLauncher.NO_RESOURCE) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) } if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } }
-
启动
org.apache.spark.deploy.yarn.Client
通过反射运行脚本参数解析后的
childMainClass
类org.apache.spark.deploy.SparkSubmit#main 119:case SparkSubmitAction.SUBMIT => submit(appArgs) ->org.apache.spark.deploy.SparkSubmit:submit 162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) ->org.apache.spark.deploy.SparkSubmit#runMain // 注意这里的childMainClass是org.apache.spark.deploy.yarn.Client,这个Client会向Yarn的RM申请的一个NM,并在NM中启动AM // 然后由AM启动用户设置的mainClass参数 712:mainClass = Utils.classForName(childMainClass) 739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 755:mainMethod.invoke(null, childArgs.toArray)
org.apache.spark.deploy.yarn.Client
使用org.apache.hadoop.yarn.client.api.YarnClient
向Yarn提交应用程序org.apache.spark.deploy.yarn.Client#main 1150:new Client(args, sparkConf).run() ->org.apache.spark.deploy.yarn.Client#run 1091:this.appId = submitApplication() org.apache.spark.deploy.yarn.Client:submitApplication 161:val containerContext = createContainerLaunchContext(newAppResponse) 174:yarnClient.submitApplication(appContext) 其中appContext中封装了启动ApplicationMaster的命令 org.apache.spark.deploy.yarn.Client#createContainerLaunchContext 887:val userClass = if (isClusterMode) { Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) } else { Nil } 910:val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } 923: val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++ Seq("--properties-file", buildPath(Environment.PWD.?(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.?() + "/bin/java", "-server") ++javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
ApplicationMaster&Driver启动
根据参数觉得是cluster模式还是client模式,如果是cluster模式,则启动用户最初在执行spark-submit脚本设置的mainClass参数,启动用户进程,并等待SparkContext注入(正常情况下,用户进程会示例化SparkContext,SparkContext在实例化过程中通过SchedulerBackend将自己注入到ApplicationMaster中,具体第6小节会介绍)
org.apache.spark.deploy.yarn.ApplicationMaster#main 763:master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) ->org.apache.spark.deploy.yarn.ApplicationMaster#run 253:if (isClusterMode) { runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver // 启动用户进程 394:userClassThread = startUserApplication() // 等待SparkContext注入 401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) ->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication 629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]]) 635:mainMethod.invoke(null, userArgs.toArray)
-
创建SparkContext
至此,已经完成了向RM申请第一个NM,并在其上启动ApplicationMaster和Driver进程的工作,接下来会运行用户设置的mainClass,当运行到
new SparkContext的时候会创建SparkContext实例
,在该过程中会创建两个特别重要的对象taskScheduler和schedulerBackend
,这两个对象协调配合将DAGScheduler创建的task集提交到Executor运行,并实时申请任务资源,监听task运行状态用户程序中执行new SparkContext()命令 org.apache.spark.SparkContext#new (371-583的try-cache代码段) 397 // 设置jar包等基本信息 _conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten 428 // 创建LisenterBus 用户SparkUI的渲染 _jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener) 432 // 创建SparkEnv _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) 501 // 创建taskScheduler和schedulerBackend val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 540 // 创建动态资源分配管理器,具体会在第9小节介绍 _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start()) ->org.apache.spark.SparkContext#createTaskScheduler 2757 // 这里如果是yarn模式 masterUrl是yarn,对应的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { // 对应的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler val scheduler = cm.createTaskScheduler(sc, masterUrl) // 对应的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }
-
创建YarnClusterScheduler
YarnClusterScheduler 主要是通过YarnScheduler继承了TaskSchedulerImpl(spark的主要实现),这里子类最重要的功能是将SparkContext注入到ApplicationMaster中(承接第五小节)
org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler 32: 通过不同的模式启动不同的子类 override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } }
-
创建 YarnClusterSchedulerBackend
YarnClusterSchedulerBackend 也是通过继承YarnSchedulerBackend继承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark实现的主类),这里子类最主要的功能是实现Spark向Yarn申请资源的桥接功能(第9小节详细介绍)和创建clientEndPoint和driverEndPoint用于通信
org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend // 根据不同的模式实例化不同的子类 40:override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn") } } -> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start 36:super.start() -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start // 创建clientEndpoint 52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv) -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") // 这里注入amEndpoint,注入时机是第8小节中ApplicationMaster被注入SparkContext后 amEndpoint = Option(am) if (!shouldResetOnAmRegister) { shouldResetOnAmRegister = true } else { // AM is already registered before, this potentially means that AM failed and // a new one registered after the failure. This will only happen in yarn-client mode. reset() } 86:super.start() -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start // 创建driverEndpoint 378:driverEndpoint = createDriverEndpointRef(properties)
-
SparkContext注入ApplicationMaster后的后续操作
第6小节最后,通过ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,然后会激活第4小节ApplicationMaster的等待
org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized 769:master.sparkContextInitialized(sc) ->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized 326:sparkContextPromise.success(sc) ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver // 此时sc取到值,开始进行下一步操作 401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv // 这里创建AMEndpoint,并向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,对应第7小节 val driverRef = runAMEndpoint( sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) } ->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint 387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) 681:override def onStart(): Unit = { driver.send(RegisterClusterManager(self)) } ->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM // 创建org.apache.spark.deploy.yarn.YarnAllocator // 这个类YarnAllocator负责从YARN ResourceManager请求容器,并确定当YARN满足某些请求时如何处理容器 359:allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress, historyAddress, securityMgr, localResources) // 初始化申请资源 368:allocator.allocateResources() // 创建监听线程,在任务运行过程中按需申请资源,线程内也会调用allocator.allocateResources() 369:reporterThread = launchReporterThread()
-
申请资源
申请资源主要由
org.apache.spark.deploy.yarn.YarnAllocator
完成,由org.apache.spark.scheduler.cluster.YarnSchedulerBackend
触发资源的申请,触发方式为使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors
消息通信来更新Executor需求数targetNumExecutors
// 主要通过调用这个方法来申请NodeManager,并启动Executor org.apache.spark.deploy.yarn.YarnAllocator#allocateResources // 先通过这个方法获取需要申请的资源数 260:updateResourceRequests() -> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests 297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning //在初始化的时候,targetNumExecutors为 // spark.dynamicAllocation.minExecutors // spark.dynamicAllocation.maxExecutors // spark.dynamicAllocation.initialExecutors 的最大值 109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) 275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) -> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors 2538:val initialExecutors = Seq( conf.get(DYN_ALLOCATION_MIN_EXECUTORS), conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max 276:handleAllocatedContainers(allocatedContainers.asScala) -> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers 442:runAllocatedContainers(containersToUse) -> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers 511:new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() -> org.apache.spark.deploy.yarn.ExecutorRunnable#run 65:startContainer() -> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer // nodeManager上需要运行的命令 98:val commands = prepareCommand() // 启动nodeManager 122:nmClient.startContainer(container.get, ctx) -> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand 201:val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.?() + "/bin/java", "-server") ++ javaOpts ++ // 这句话是重点,说明会启动这个类,具体在第10小节介绍 Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
在第5小节创建SparkContext的时候有提到创建executorAllocationManager动态资源申请管理器,这个就是当
spark.dynamicAllocation.enabled
设置为true的时候会生效,并在任务运行时动态申请资源org.apache.spark.ExecutorAllocationManager
通过调用org.apache.spark.ExecutorAllocationClient
的requestTotalExecutors
方法来申请资源而
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
实现了这个接口该类又通过调用子类
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
的doRequestTotalExecutors方法最终实现资源的申请org.apache.spark.SparkContext#new 552:_executorAllocationManager.foreach(_.start()) -> org.apache.spark.ExecutorAllocationManager#start 222:val scheduleTask = new Runnable() { override def run(): Unit = { try { schedule() } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } } // 创建一个定时任务类来调度资源 executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) -> org.apache.spark.ExecutorAllocationManager#schedule 281: updateAndSyncNumExecutorsTarget(now) -> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget // maxNumExecutorsNeeded 通过 // listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行触发更新) // listener.totalRunningTasks(在DAGScheduler的996行触发更新) // spark.executor.cores (用户自定义,默认1)计算而来,具体计算&更新方法待补充 310: val maxNeeded = maxNumExecutorsNeeded 331: val delta = addExecutors(maxNeeded) 380: val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors 548: doRequestTotalExecutors(numExecutors) ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors // 构建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,并使用clientEndpoint发送 138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply // 最终将RequestExecutors 消息转发给ApplicationMaster 280:case r: RequestExecutors => amEndpoint match { case Some(am) => am.ask[Boolean](r).andThen { case Success(b) => context.reply(b) case Failure(NonFatal(e)) => logError(s"Sending $r to AM was unsuccessful", e) context.sendFailure(e) }(ThreadUtils.sameThread) case None => logWarning("Attempted to request executors before the AM has registered!") context.reply(false) } ->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply 696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist) -> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities // 最终修改了targetNumExecutors,这样当调用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的时候就会拿到不一样的targetNumExecutors,而requestedTotal的源头是 org.apache.spark.ExecutorAllocationManager#schedule计算出来的 218: targetNumExecutors = requestedTotal
DAGScheduler 维护PendingTask org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#totalPendingTasks 722 org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#stageIdToNumTasks 583 org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#onStageSubmitted 595 org.apache.spark.scheduler.SparkListenerBus#doPostEvent 33 org.apache.spark.scheduler.DAGScheduler#submitMissingTasks 986 CoarseGrainedSchedulerBackend 维护RunningTask org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#totalRunningTasks 731 org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#numRunningTasks 587 org.apache.spark.ExecutorAllocationManager.ExecutorAllocationListener#onTaskStart 652 org.apache.spark.scheduler.SparkListenerInterface#onTaskStart 41 org.apache.spark.scheduler.DAGScheduler#handleBeginEvent 807 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive 1705 org.apache.spark.scheduler.DAGScheduler#taskStarted 206 org.apache.spark.scheduler.TaskSetManager#resourceOffer 498 org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet 295 org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers 375 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers 261 ExecutorAllocationManager 动态计算需要的Executor org.apache.spark.ExecutorAllocationManager#schedule 278 org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget 310 org.apache.spark.ExecutorAllocationManager#maxNumExecutorsNeeded 264
小疑问,
org.apache.spark.SparkContext#requestExecutors
方法也可以通过调用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors
进而申请资源,且该方法里的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
日志可以在Spark Driver日志里找到,但不知道谁调用但这个方法
-
org.apache.spark.executor.CoarseGrainedExecutorBackend 类
第8小节中,申请到资源后,在NodeManager中会启动
org.apache.spark.executor.CoarseGrainedExecutorBackend
进程,该进程和org.apache.spark.executor.CoarseGrainedScheduleBackend
是多对一关系,从名字是可以看成CoarseGrainedExecutorBackend是Executor的管理器,它会持有一个Executor对象,Executor会通过启动一个线程池来运行Task,这样整个流程就串起来了org.apache.spark.executor.CoarseGrainedExecutorBackend#main 284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) -> org.apache.spark.executor.CoarseGrainedExecutorBackend#run 226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) -> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart // 向CoarseGrainedSchedulerBackend注册自己 63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply // 检测没有问题,通知CoarseGrainedExecutorBackend创建Executor 195: executorRef.send(RegisteredExecutor) -> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive 83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) -> org.apache.spark.executor.Executor#new 101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
-
至此SparkContext创建流程已经全部完毕,SparkContext创建成功后,就会开始执行用户代码,先构建RDD的逻辑关系图,然后遇到action会切分stage形成物理执行图,然后通过SparkContext.runJob执行,具体分析可见 github.com/JerryLead/S…
-