SparkStreaming数据流从currentBuffer到Block定时转化过程源码深度剖析-Spark商业环境实战

487

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 ReceiverTracker 以一发牵动全身

  • 下图深度剖析了ReceiverTracker中如何实现 receiver RDD 的Job提交流程,右侧黄色底面为本节重点要讲的ReceiverSupervisorImpl。实现了receiver的启动,以及Block的生成等过程。

  • 下图可以清晰的看到supervisor作为父类,在StartReceiverFunc,启动了两个start函数:

        /** Start the supervisor */
        def start() {
          onStart()
          startReceiver()
        }
    

    (1)第一个启动了 ReceiverSupervisorImpl的 onStart()方法,从而启动了registeredBlockGenerators,开启了数据batch的生成和管理。

      override protected def onStart() {
          registeredBlockGenerators.asScala.foreach { _.start() }
        }
    

    (2)第二个 startReceiver,先调用startReceiver,进一步会调用ReceiverSupervisorImpl的onReceiverStart方法来判断是否成功注册到ReceiverTracker中,若成功则会启动receiver

    supervisor的startReceiver方法
    
    def startReceiver(): Unit = synchronized {
      try {
        if (onReceiverStart()) {           <=神来之笔(端点通讯注册Receiver)
        
          logInfo(s"Starting receiver $streamId")
          receiverState = Started
          
          receiver.onStart()                <=神来之笔
          
          logInfo(s"Called receiver $streamId onStart")
        } else {
          // The driver refused us
          stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
        }
      } catch {
        case NonFatal(t) =>
          stop("Error starting receiver " + streamId, Some(t))
      }
    }
    
    
      ReceiverSupervisorImpl的onReceiverStart方法
       
      override protected def onReceiverStart(): Boolean = {
          val msg = RegisterReceiver(
            streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
          trackerEndpoint.askSync[Boolean](msg)
        }
    

    (3)ReceiverTracker的receiver注册请求管理

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      
        // Remote messages
        case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
          val successful =
            registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)                 <=神来之笔 (eceiverTrackingInfos的管理) 
            
          context.reply(successful)
          
        case AddBlock(receivedBlockInfo) =>
          if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
            walBatchingThreadPool.execute(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                if (active) {
                  context.reply(addBlock(receivedBlockInfo))
                } else {
                  throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
                }
              }
            })
          } else {
            context.reply(addBlock(receivedBlockInfo))
          }
          
        case DeregisterReceiver(streamId, message, error) =>
          deregisterReceiver(streamId, message, error)
          context.reply(true)
    
        // Local messages
        case AllReceiverIds =>
          context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
          
        case GetAllReceiverInfo =>
          context.reply(receiverTrackingInfos.toMap)
          
        case StopAllReceivers =>
          assert(isTrackerStopping || isTrackerStopped)
          stopReceivers()
          context.reply(true)
      }
    

(4)registerReceiver中如何实现receiverTrackingInfos的管理

     /** Register a receiver */
      private def registerReceiver(
          streamId: Int,
          typ: String,
          host: String,
          executorId: String,
          receiverEndpoint: RpcEndpointRef,
          senderAddress: RpcAddress
        ): Boolean = {
        if (!receiverInputStreamIds.contains(streamId)) {
          throw new SparkException("Register received for unexpected id " + streamId)
        }
    
        if (isTrackerStopping || isTrackerStopped) {
          return false
        }
    
        val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
        val acceptableExecutors = if (scheduledLocations.nonEmpty) {
            // This receiver is registering and it's scheduled by
            // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
            scheduledLocations.get
          } else {
            // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
            // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
            scheduleReceiver(streamId)
          }
    
        def isAcceptable: Boolean = acceptableExecutors.exists {
          case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
          case loc: TaskLocation => loc.host == host
        }
    
        if (!isAcceptable) {
          // Refuse it since it's scheduled to a wrong executor
          false
        } else {
          val name = s"${typ}-${streamId}"
          val receiverTrackingInfo = ReceiverTrackingInfo(
            streamId,
            ReceiverState.ACTIVE,
            scheduledLocations = None,
            runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
            name = Some(name),
            endpoint = Some(receiverEndpoint))
            
          receiverTrackingInfos.put(streamId, receiverTrackingInfo)               <=神来之笔
          
          listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
          logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
          true
        }
      }
  • ReceiverSupervisorImpl总体架构图如下:

2 BlockGenerator的深度剖析

2.1 SocketInputDStream 数据流的接收存储过程

  • 依赖于ReceiverSupervisor

       /** Create a socket connection and receive data until receiver is stopped */
       def receive() {
         try {
           val iterator = bytesToObjects(socket.getInputStream())
           while(!isStopped && iterator.hasNext) {
           
             store(iterator.next())                  <= 神来之笔
             
           }
           if (!isStopped()) {
             restart("Socket data stream had no more data")
           } else {
             logInfo("Stopped receiving")
           }
         } catch {
           case NonFatal(e) =>
             logWarning("Error receiving data", e)
             restart("Error receiving data", e)
         } finally {
           onStop()
         }
       }
    
  • 依赖于ReceiverSupervisor的pushSingle方法

     * Store a single item of received data to Spark's memory.
     * These single items will be aggregated together into data blocks before
     * being pushed into Spark's memory.
    
     def store(dataItem: T) {
       supervisor.pushSingle(dataItem)             <= 神来之笔
     }
    
  • 依赖于ReceiverSupervisor的内部的defaultBlockGenerator

    /* Push a single record of received data into block generator. 
    def pushSingle(data: Any) {
      defaultBlockGenerator.addData(data)           <= 神来之笔
    }
    

2.2 BlockGenerator的重剑无锋

  • 兄弟1:blockIntervalTimer
  • 兄弟2:blockPushingThread

BlockGenerator的仗剑走天涯,诗酒趁年华。两大线程解决block存储和管理问题:

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")

一大线程:
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

二大线程:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
  • 俩兄弟上吧

      def start(): Unit = synchronized {
         if (state == Initialized) {
           state = Active
    
           blockIntervalTimer.start()            <= 神来之笔
           blockPushingThread.start()            <= 神来之笔
           
           logInfo("Started BlockGenerator")
         } else {
           throw new SparkException(
             s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
         }
       }    
    

2.3 BlockGenerator的厚积薄发

  • 积水缓冲缸(存储离散的数据流)(currentBuffer)

    @volatile private var currentBuffer = new ArrayBuffer[Any]

  • 桶装水(积水缓冲缸的水聚集成桶)(blocksForPushing)

      private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
    
  • 积水缓冲缸通过InputDStream蓄水

    * Push a single data item into the buffer.
    def addData(data: Any): Unit = {
      if (state == Active) {
        waitToPush()
        synchronized {
          if (state == Active) {
            currentBuffer += data
          } else {
            throw new SparkException(
              "Cannot add data as BlockGenerator has not been started or has been stopped")
          }
        }
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
    
  • blockIntervalTimer把积水缓冲缸转换为桶装水,管理起来

       /** Change the buffer to which single records are added to. */
        private def updateCurrentBuffer(time: Long): Unit = {
          try {
            var newBlock: Block = null
            synchronized {
              if (currentBuffer.nonEmpty) {
                val newBlockBuffer = currentBuffer
                currentBuffer = new ArrayBuffer[Any]
                
                val blockId = StreamBlockId(receiverId, time - blockIntervalMs) <= 神来之笔
                
                listener.onGenerateBlock(blockId)
                
                newBlock = new Block(blockId, newBlockBuffer)         <= 神来之笔   
                
              }
            }
      
            if (newBlock != null) {
            
              blocksForPushing.put(newBlock)  // put is blocking when queue is full    <= 神来之笔
            }
          } catch {
            case ie: InterruptedException =>
              logInfo("Block updating timer thread was interrupted")
            case e: Exception =>
              reportError("Error in block updating thread", e)
          }
        }
    
  • keepPushingBlocks 看我搅动风云

2.3 BlockGenerator内部keepPushingBlocks的搅动风云

  • blockPushingThread线程调用defaultBlockGeneratorListener,通过blockManager.putBytes来存储Block,同时告诉Driver端ReceiverTracker的 AddBlock(blockInfo)信息添加成功。
  • ReceiverTracker通过上面的端点通信掌握了Block的存储元数据信息。
  • 根据是否开启预读写日志,出现WriteAheadLogBasedBlockHandler和BlockManagerBasedBlockHandler,参数可以通过conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)来配置。
  • 一切的一切都是ReceiverSupervisorImpl来主导的,因为ReceiverSupervisorImpl内部成员包含了registeredBlockGenerators,defaultBlockGeneratorListener,trackerEndpoint等等所有给力干将。所以从receiver启动到block生成,到block的管理,再到上报给ReceiverTracker,真正实现了覆盖一切的角色

3 总结

我们发现看了一场ReceiverSupervisorImpl的世纪大戏,自导自演解决了端到端的问题。

秦凯新 于深圳 2018