Spark 源码解析(六): 向 driver 注册 Executor

461 阅读2分钟

前一篇文章介绍了 Executor 进程的启动,最后启动的是 CoarseGrainedExecutorBackend,执行启动命令后会执行它的 main 方法,启动 CoarseGrainedExecutorBackend 进程。

CoarseGrainedExecutorBackend 进程是 Executor 的守护进程,用户 Executor 的创建和维护。

首先我们先看下 main 方法,主要就是获取相关参数,然后调用 run 方法。

def main(args: Array[String]) {
  
  	// 申明一些变量
    var driverUrl: String = null
    var executorId: String = null
    var hostname: String = null
    var cores: Int = 0
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()

    var argv = args.toList
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          // scalastyle:off println
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          // scalastyle:on println
          printUsageAndExit()
      }
    }
		
  	// 判断变量的合法性
    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }

  	// 将参数传递给 run 方法去执行
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }

run 方法位于 CoarseGrainedExecutorBackend 的伴生对象中,这里主要看创建了一个 Executor 的 sparkEnv,然后往这个 sparkEnv 中注册了两个 rpcEndpoint,一个是名为 Executor 的 CoarseGrainedExecutorBackend 对象,一个是名为 WorkerWatcher 的 WorkerWatcher 对象。

		// 创建 Executor 的 SparkEnv	
		val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
			// 创建 CoarseGrainedExecutorBackend 实例,并注册到上面的 sparkEnv 中
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      // 创建 WorkerWatcher,用于检测 worker 的状态,碰到异常情况就关闭 CoarseGrainedExecutorBackend
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()

最主要的还是看 CoarseGrainedExecutorBackend 这个 rpcEndpoint 在创建完注册到 rpcEnv 中触发的 onstart 方法。

在其 onstart 方法中会向 driver 发送 RegisterExecutor 的消息。也就是向 taskSchedulerImpl 中的 StandaloneSchedulerBackend 发送消息。

override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
     	// 给 driver 发送 RegisterExecutor 的消息
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

而 StandaloneSchedulerBackend 是继承于 CoarseGrainedSchedulerBackend,所以消息接收处理都在 CoarseGrainedSchedulerBackend 这个里面。

下面就看 CoarseGrainedSchedulerBackend 中的 receive 方法的模式匹配到 RegisterExecutor 这条消息后会做这些操作:

1,判断 executor 是否重复注册,如果重复注册直接回复消息;

2,更新内存中存储的关于 executor 的一些数据;

3,回复注册成功 executor 消息;

// 匹配到 RegisterExecutor 消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
      // 防止重复注册  
  		if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val data = new ExecutorData(executorRef, executorRef.address, hostname,
            cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
        	// 回复注册成功的消息
          executorRef.send(RegisteredExecutor)
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
        }

然后 CoarseGrainedExecutorBackend 端收到注册成功的消息后回去创建 Executor 对象。

case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        // 创建 Executor 对象
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

至此,Executor 已经在 driver 注册完了。