阅读 25

SparkStreaming之JobGenerator周期性任务数据处理逻辑源码深度剖析-Spark商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 JobGenerator的前世

1.1 JobGenerator的难兄难弟ReceiverTracker

1.2 ReceiverTracker 的难兄难弟JobGenerator

JobGenerator周期性的不断产生Job,最终Job会在Executor上执行处理。

1.3 ReceiverTracker与receivedBlockTracker 的相爱相杀

  • 我们可以看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker内部维护了一个 streamIdToUnallocatedBlockQueues,用于追踪Executor上报上来的Block。

      class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
        private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
        private val receiverInputStreamIds = receiverInputStreams.map { _.id }
        private val receivedBlockTracker = new ReceivedBlockTracker(
          ssc.sparkContext.conf,
          ssc.sparkContext.hadoopConfiguration,
          receiverInputStreamIds,
          ssc.scheduler.clock,
          ssc.isCheckpointPresent,
          Option(ssc.checkpointDir)
        )
    复制代码
  • receivedBlockTracker内部重要的元数据存储结构:

      private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    复制代码

1.4 StreamingContext如何双剑合璧

JobScheduler里面包含核心的重量级成员,分别是:jobGenerator 和 receiverTracker。其中初始化如下:

注意:jobGenerator中构造函数是JobScheduler

private val jobGenerator = new JobGenerator(this)

receiverTracker = new ReceiverTracker(ssc)
复制代码

2 JobGenerator的今生

  • JobGenerator 中重要成员RecurringTimer,负责用户定义时间窗的触发
      private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
          longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    复制代码
  • JobGenerator 的启动,通过StreamingContext来触发,最终调用startFirstTime
    def start(): Unit = synchronized {
      if (eventLoop != null) return // generator has already been started
      // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
      // See SPARK-10125
      checkpointWriter
    
      eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
        override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = {
          jobScheduler.reportError("Error in job generator", e)
        }
      }
      eventLoop.start()
    
      if (ssc.isCheckpointPresent) {
        restart()
      } else {
        startFirstTime()
      }
    }
    复制代码
  • JobGenerator 最终启动ssc.graph和timer,因此整个处理逻辑开始启动了。
      private def startFirstTime() {
          val startTime = new Time(timer.getStartTime())
          graph.start(startTime - graph.batchDuration)
          timer.start(startTime.milliseconds)
          logInfo("Started JobGenerator at " + startTime)
        }
    复制代码

2.1 JobGenerator的4步核心处理逻辑

2.2 第一步:allocateBlocksToBatch

  • JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用

       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    复制代码
  • receiverTracker持有receivedBlockTracker的引用

  • 从streamIdToUnallocatedBlockQueues取出streamId对应的所有间隔为200ms(default)采集的block,并把它放到timeToAllocatedBlocks中。

       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
     
        def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
          if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          
          
          (首先按照用户设置的时间窗,从streamIdToUnallocatedBlockQueues取出所有的Block)
            val streamIdToBlocks = streamIds.map { streamId =>
                (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
            }.toMap                                                   <=点睛之笔
            
            (然后把未分配用户指定时间窗的block放进timeToAllocatedBlocks)
            val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    <=点睛之笔
            
            
            if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
              timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
              lastAllocatedBatchTime = batchTime
            } else {
              logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
            }
          } else {
            // This situation occurs when:
            // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
            // possibly processed batch job or half-processed batch job need to be processed again,
            // so the batchTime will be equal to lastAllocatedBatchTime.
            // 2. Slow checkpointing makes recovered batch time older than WAL recovered
            // lastAllocatedBatchTime.
            // This situation will only occurs in recovery time.
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          }
        }
    复制代码
  • timeToAllocatedBlocks 是 receiverTracker(成员receivedBlockTracker)中包含的核心成员,反向迭代到调用链最顶端,根据timeToAllocatedBlocks来生成generatedRDDs
  • streamIdToUnallocatedBlockQueues :没有被分配批次的Block集合
  • timeToAllocatedBlocks :已经被分配批次的block集合
  • 下面是DStream的模板代码,就是为了生成RDD来使用的,getOrCompute方法只有DStream有,所以上一级生成RDD后,就会放入generatedRDDs中。
  • generatedRDDs 中没有,就会调用compute,而Compute又会调用getOrCompute。getOrCompute又会调用Compute,反反复复进行一直回溯到InputDStream的Compute
     * Get the RDD corresponding to the given time; either retrieve it from cache
     * or compute-and-cache it.
     
    private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    
      // If RDD was already generated, then retrieve it from HashMap,
      // or else compute the RDD
      
      generatedRDDs.get(time).orElse {
        // Compute the RDD if time is valid (e.g. correct time in a sliding window)
        // of RDD generation, else generate nothing.
        if (isTimeValid(time)) {
    
          val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
            // Disable checks for existing output directories in jobs launched by the streaming
            // scheduler, since we may need to write output to an existing directory during checkpoint
            // recovery; see SPARK-4835 for more details. We need to have this call here because
            // compute() might cause Spark jobs to be launched.
            SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
              compute(time)
            }
          }
    
          rddOption.foreach { case newRDD =>
            // Register the generated RDD for caching and checkpointing
            if (storageLevel != StorageLevel.NONE) {
              newRDD.persist(storageLevel)
              logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
            }
            if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
              newRDD.checkpoint()
              logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
            }
            
            generatedRDDs.put(time, newRDD)                           <=点睛之笔
            
          }
          rddOption
        } else {
          None
        }
      }
    }
    复制代码
  • MapPartitionedDStream的compute方法
       override def compute(validTime: Time): Option[RDD[U]] = {
          parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
        }
    复制代码
  • eceiverInputDstream中的compute方法
       * Generates RDDs with blocks received by the receiver of this stream. 
       
            override def compute(validTime: Time): Option[RDD[T]] = {
              val blockRDD = {
      
            if (validTime < graph.startTime) {
              // If this is called for any time before the start time of the context,
              // then this returns an empty RDD. This may happen when recovering from a
              // driver failure without any write ahead log to recover pre-failure data.
              new BlockRDD[T](ssc.sc, Array.empty)
            } else {
            
            
              // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
              // for this batch
              
              (主要从timeToAllocatedBlocks中取出数据)
              val receiverTracker = ssc.scheduler.receiverTracker                    <=点睛之笔
              val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)    <=点睛之笔
      
              // Register the input blocks information into InputInfoTracker
              val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
              ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
      
             (主要从timeToAllocatedBlocks中取出数据,构建RDD,方便后续调用链使用generatedRDDs)
              // Create the BlockRDD 
              createBlockRDD(validTime, blockInfos)                       <=点睛之笔                 
            }
          }
          Some(blockRDD)
        }
    复制代码

可见 allocateBlocksToBatch的作用就是为了把对应窗的Block放进timeToAllocatedBlocks。方便调用链使用。

2.3 第二步:graph.generateJobs

  • DStreamGraph的核心作用是注册了outputStreams,那么是什么时候注册的呢?
  • Action函数 print -> foreachRDD -> ForEachDStream -> register -> ssc.graph.addOutputStream(this)
  • DStreamGraph.generateJobs最终调用了 outputStream.generateJob(time)
      private val inputStreams = new ArrayBuffer[InputDStream[_]]()
      private val outputStreams = new ArrayBuffer[DStream[_]]()
     
      def generateJobs(time: Time): Seq[Job] = {
         logDebug("Generating jobs for time " + time)
         val jobs = this.synchronized {
           outputStreams.flatMap { outputStream =>
             val jobOption = outputStream.generateJob(time)
             jobOption.foreach(_.setCallSite(outputStream.creationSite))
             jobOption
           }
         }
         logDebug("Generated " + jobs.length + " jobs for time " + time)
         jobs
       }
    复制代码
  • outputStream.generateJob定义了jobFunc,生成new Job(time, jobFunc)
       private[streaming] def generateJob(time: Time): Option[Job] = {
         getOrCompute(time) match {
           case Some(rdd) =>
             val jobFunc = () => {
               val emptyFunc = { (iterator: Iterator[T]) => {} }
               context.sparkContext.runJob(rdd, emptyFunc)
             }
             Some(new Job(time, jobFunc))
           case None => None
         }
       }
    复制代码

2.4 第三步: jobScheduler.inputInfoTracker.getInfo(time)

  • 就是为了对应批次Block的元数据信息

    // Map to track all the InputInfo related to specific batch time and input stream. private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]

     case class StreamInputInfo(
         inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) 
    复制代码

2.5 第四步: jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  • JobGenerator 持有JobScheduler的引用,最终会提交Job的并开始驱动Executor计算。
    def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
          listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          jobSets.put(jobSet.time, jobSet)
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
      }
    复制代码

    3 总结

本文是作者花大量时间整理而成,请勿做伸手党,禁止转载,欢迎学习,有问题请留言。

秦凯新 于深圳 2018

关注下面的标签,发现更多相似文章
评论