SparkStreaming不间断运行模式下的流式数据清理机制源码深度剖析-Spark商业环境实战

781

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 不扫一屋何以扫天下

  • SparkStreaming 应用在持续不断的运行,假设Spark 数据接入只进不出,那么即使Spark内存使用多么牛逼,也会崩掉的,因此及时进行内存数据的清理和磁盘的清理可谓重中之重。
  • 那么SparkStreaming应用的对象,数据,元数据这些信息如何进行回收呢?先抛个问题。

2 何时扫天下?

2.1 jobScheduler之job的提交到结束

  • JobGenerator触发generateJobs
  • JobGenerator -> jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  • submitJobSet -> jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  • jobScheduler -> _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  • jobScheduler -> handleJobCompletion(job, completedTime)
  • jobScheduler -> jobGenerator.onBatchCompletion(jobSet.time)
  • jobGenerator -> eventLoop.post(ClearMetadata(time))

2.2 clearMetadata 神龙见首不见尾

  • 主要的缓存元数据
    private val batchTimeToInputInfos =
      new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
      
     private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]    
    
  • ssc.graph.clearMetadata : 基于outputStreams 清除 RDD,通过BlockManager清除Block数据

  • jobScheduler.inputInfoTracker.cleanup : 基于inputInfoTracker清除缓存中的timeToAllocatedBlocks。

  • jobScheduler.inputInfoTracker.cleanup : 基于inputInfoTracker清除batchTimeToInputInfos中元数据

    private def clearMetadata(time: Time) {
    
        ssc.graph.clearMetadata(time)               <- 核心之处
    
        // If checkpointing is enabled, then checkpoint,
        // else mark batch to be fully processed
        if (shouldCheckpoint) {
          eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))  <- 核心之处
    
        } else {
          // If checkpointing is not enabled, then delete metadata information about
          // received blocks (block data not saved in any case). Otherwise, wait for
          // checkpointing of this batch to complete.
          val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    
          jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)  <- 核心之处
          jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)    <- 核心之处
          markBatchFullyProcessed(time)
        }
      } 
    

2.3 ssc.graph.clearMetadata 之RDD再见

   * Clear metadata that are older than `rememberDuration` of this DStream.
   * This is an internal method that should not be called directly. This default
   * implementation clears the old generated RDDs. Subclasses of DStream may override
   * this to clear their own metadata along with the generated RDDs.
   
  private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
      
    generatedRDDs --= oldRDDs.keys        <- 核心之处
    
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
      
        rdd.unpersist(false)               <- 核心之处
        
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
RDD清除细节
 private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }

2.4 cleanupOldBlocksAndBatches 之 batch数据再见

   * Clean up block information of old batches. If waitForCompletion is true, this method
   * returns only after the files are cleaned up.
   
  def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
    
    val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
    logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
    
    if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
      timeToAllocatedBlocks --= timesToCleanup
      writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
    } else {
      logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
    }
  }

2.5 cleanupOldBlocksAndBatches 之 batch info 元数据再见

  def cleanup(batchThreshTime: Time): Unit = synchronized {
    val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
    logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
    batchTimeToInputInfos --= timesToCleanup
  }

3 总结

不扫一屋何以扫天下 终章

秦凯新 于深圳 1:13 2018