StreamingContext启动流程及Dtream 模板源码剖析-SparkStreaming商业环境实战

1,121 阅读7分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 SparkStreaming is What?

SparkStreaming 是基于批处理的流式计算平台,目前默认是200ms的间隔。SparkStreaming 会把数据流封装成一个个批次,然后把多个批次的数据转换成RDD,并交由BlockManger管理,最终以任务的方式进行提交DAG有向无环图。

  • 离散流DStrem : DStrem可以认为是没有边界的集合,没有大小的限制。因为包含了时间的维度,因此可以看做是时空模型。当然DStrem也是一个逻辑的概念,因为SparkStreaming完全基于Dstream来构建算子体系,依赖关系。但是请注意,Dstream 又是抽象的,计算过程最终都会转化为RDD来实现,所以RDD作为最底层的基础,是具有物理意义的。
  • 随着时间的流逝,Dstrem会不断的产生RDD,因此对Dstrem的操作就是在固定时间上对RDD的操作。

2 StreamingContext入口类的真面目

2.1 StreamingContext英文详细解释

 * Main entry point for Spark Streaming functionality. It provides methods used to create
 * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
 * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
 * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
 * The associated SparkContext can be accessed using `context.sparkContext`. After
 * creating and transforming DStreams, the streaming computation can be started and stopped
 * using `context.start()` and `context.stop()`, respectively.
 * `context.awaitTermination()` allows the current thread to wait for the termination
 * of the context by `stop()` or by an exception.

2.1 SparkStreaming基本案例与深层次剖析

  • SparkStreaming基本案例

      object NetworkWordCount {
        def main(args: Array[String]) {
          if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
          }
      
          StreamingExamples.setStreamingLogLevels()
          
          // Create the context with a 1 second batch size
          val sparkConf = new SparkConf().setAppName("NetworkWordCount")
          val ssc = new StreamingContext(sparkConf, Seconds(1))
      
          // Create a socket stream on target ip:port and count the
          // words in input stream of \n delimited text (eg. generated by 'nc')
          // Note that no duplication in storage level only for running locally.
          // Replication necessary in distributed scenario for fault tolerance.
          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
          val words = lines.flatMap(_.split(" "))
          val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
          wordCounts.print()
          ssc.start()
          ssc.awaitTermination()
        }
      }
    
  • StreamingContext到SparkContext转换

       def this(conf: SparkConf, batchDuration: Duration) = {
          this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
        }
        
      private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
          new SparkContext(conf)
        }
    
  • socketTextStream依赖关系

(1)超级父类逻辑模板英文专业讲解:

    * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
     * sequence of RDDs (of the same type) representing a continuous stream of data (see
     * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
     * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
     * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
     * transforming existing DStreams using operations such as `map`,
     * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
     * periodically generates a RDD, either from live data or by transforming the RDD generated by a
     * parent DStream.
     *
     * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
     * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
     * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
     * `join`. These operations are automatically available on any DStream of pairs
     * (e.g., DStream[(Int, Int)] through implicit conversions.
     *
     * A DStream internally is characterized by a few basic properties:
     *  - A list of other DStreams that the DStream depends on
     *  - A time interval at which the DStream generates an RDD
     *  - A function that is used to generate an RDD after each time interval

(2) Dstream源码段摘录

除了第一个Dstream,后面的Dstream都要依赖前面的Dstream。
Dstream在每一个时间间隔(intrval)都会生成一个RDD
    abstract class DStream[T: ClassTag] (
        @transient private[streaming] var ssc: StreamingContext
      ) extends Serializable with Logging {
    
      validateAtInit()
    
      // =======================================================================
      // Methods that should be implemented by subclasses of DStream
      // =======================================================================
    
      /** Time interval after which the DStream generates an RDD */
      def slideDuration: Duration
    
      /** List of parent DStreams on which this DStream depends on */
      def dependencies: List[DStream[_]]
    
      /** Method that generates an RDD for the given time */
      def compute(validTime: Time): Option[RDD[T]]
    
      // =======================================================================
      // Methods and fields available on all DStreams
      // =======================================================================
generatedRDD表示Dstream在每一个批次所相应生成的RDD
      // RDDs generated, marked as private[streaming] so that testsuites can access it
      @transient
      private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
    
      // Time zero for the DStream
      private[streaming] var zeroTime: Time = null
    
      // Duration for which the DStream will remember each RDD created
      private[streaming] var rememberDuration: Duration = null
    
      // Storage level of the RDDs in the stream
      private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
计算函数getOrCompute
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
      private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
        // If RDD was already generated, then retrieve it from HashMap,
        // or else compute the RDD
        generatedRDDs.get(time).orElse {
          // Compute the RDD if time is valid (e.g. correct time in a sliding window)
          // of RDD generation, else generate nothing.
          if (isTimeValid(time)) {
    
            val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details. We need to have this call here because
              // compute() might cause Spark jobs to be launched.
              SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
                compute(time)
              }
            }
            rddOption.foreach { case newRDD =>
              // Register the generated RDD for caching and checkpointing
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
                logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
              }
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
                logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
              }
              generatedRDDs.put(time, newRDD)
            }
            rddOption
          } else {
            None
          }
        }
      }
Job生成函数generateJob
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

2.2 socketTextStream依赖关系

2.3 DStream及其所有子类及Dstream的Action触发

目前Dstream的输出触发操作有:print,saveAsTextFiles,saveAsObjectFiles,saveAsHadoopFiles, foreachRDD。而这些输出触发操作会生成ForeachDStream对象。并注册到DStreamGraph的成员outputStreams中。

  final private[streaming] class DStreamGraph extends Serializable with Logging {
  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()

而StreamingContext中,DStreamGraph 是重要的成员,专门负责action操作。

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }

2.3 DStreamGraph的回溯

各个Dstream对象的依赖关系和操作算子最终如何串成一条串呢?DStreamGraph会利用outputStreams进行回溯并生成Job,当StreamingContext启动的时候,才会真正执行算法链条。

3 ForeachDStream 与ForeachRDD 生死相依

可以看到Dstream抽象父类模板的print函数实际上会定义ForeachFuc 和 ForeachRDD ,ForeachRDD中包含了ForeachDstream,而这个ForeachDstream最终会注册到StreamingContext.

  • print 函数

       * Print the first num elements of each RDD generated in this DStream. This is an output
       * operator, so this DStream will be registered as an output stream and there materialized.
        def print(num: Int): Unit = ssc.withScope {
          def foreachFunc: (RDD[T], Time) => Unit = {
            (rdd: RDD[T], time: Time) => {
              val firstNum = rdd.take(num + 1)
              // scalastyle:off println
              println("-------------------------------------------")
              println(s"Time: $time")
              println("-------------------------------------------")
              firstNum.take(num).foreach(println)
              if (firstNum.length > num) println("...")
              println()
              // scalastyle:on println
            }
          }
          foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
        }
    
  • saveAsTextFiles 函数

       def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
          val saveFunc = (rdd: RDD[T], time: Time) => {
            val file = rddToFileName(prefix, suffix, time)
            rdd.saveAsTextFile(file)
          }
          this.foreachRDD(saveFunc, displayInnerRDDOps = false)
        }
    
  • register 函数

       * Register this streaming as an output stream. This would ensure that RDDs of this
       * DStream will be generated.
        private[streaming] def register(): DStream[T] = {
          ssc.graph.addOutputStream(this)
          this
        }
    

4 总结 InputStream 和 OutputStream

  • InputStream 定义了业务部分的数据源的处理逻辑。但是接收器Receiver才是最终的水流的开关,在Executor上Receiver接收流数据,然后缓冲起来,积累成块,然后交由BlockManager管理。最终分配给相应处理时间间隔的Job。
  • 在Driver端需要有一个跟踪器ReceiverTracker,而这个跟踪器会不断监督Executor启动Receiver,(比如:发送Receiver Rdd 给Executor, 注意是把Receiver作为RDD给Executor),同时管理待分配给Job的数据Block的元数据。

5 StreamingContext 启动秘籍

5.1 StreamingContext 启动使得JobScheduler联动(好基友)

private[streaming] val scheduler = new JobScheduler(this)

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()
            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              
              scheduler.start()     <= 神来之笔
              
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        logDebug("Adding shutdown hook") // force eager creation of logger
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

5.2 JobScheduler联动后坐拥乾坤

  • Driver端:启动receiverTracker => 用于数据接收,数据缓存,Block生成

  • Driver端:启动jobGenerator => 用于DstreamGraph初始化,Dstream与RDD的转换,生成Job,提交执行

      def start(): Unit = synchronized {
          if (eventLoop != null) return // scheduler has already been started
      
      logDebug("Starting JobScheduler")
      eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
        override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
      }
      
      eventLoop.start()                                <= 神来之笔,耳听八方
    
      // attach rate controllers of input streams to receive batch completion updates
      for { 
      
        inputDStream <- ssc.graph.getInputStreams      <= 神来之笔
       
        rateController <- inputDStream.rateController
      } ssc.addStreamingListener(rateController)
    
      listenerBus.start()
      
      receiverTracker = new ReceiverTracker(ssc)        <= 神来之笔
      
      inputInfoTracker = new InputInfoTracker(ssc)  <= 神来之笔 用于管理所有的输入流以及输入的数据统计
    
      val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
        case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
        case _ => null
      }
    
      executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
        executorAllocClient,
        receiverTracker,
        ssc.conf,
        ssc.graph.batchDuration.milliseconds,
        clock)
      executorAllocationManager.foreach(ssc.addStreamingListener)
      
      receiverTracker.start()                         <= 神来之笔
      jobGenerator.start()                            <= 神来之笔
      
      executorAllocationManager.foreach(_.start())
      logInfo("Started JobScheduler")
    }
    

5.3 JobScheduler的耳听八方

    private var eventLoop: EventLoop[JobSchedulerEvent] = null

    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
  • JobStarted

  • JobCompleted

  • ErrorReported

       private def processEvent(event: JobSchedulerEvent) {
          try {
            event match {
              case JobStarted(job, startTime) => handleJobStart(job, startTime)
              case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
              case ErrorReported(m, e) => handleError(m, e)
            }
          } catch {
            case e: Throwable =>
              reportError("Error in job scheduler", e)
          }
        }
    

6 总结

本文重点解剖了StreamingContext启动流程及Dtream 模板源码,没有参考任何网上博客,郑重声明为原创内容,禁止转载或用于商业用途。

秦凯新 于深圳 2018