阅读 190

Spark 存储模块源码学习

其他更多java基础文章:
java基础学习(目录)


学习资料:
spark 源码分析之十八 -- Spark存储体系剖析 **
Spark Core源码精读计划#29:BlockManager主从及RPC逻辑 **
Spark Block Manager管理
BlockManager初始化和注册解密
Cloud Compution BlockManagerMaster
Spark自己的分布式存储系统BlockManager全解析
[Spark内核] 第38课:BlockManager架构原理、运行流程图和源码解密
Spark TaskMemoryManager如何为task分配执行内存
Spark存储体系——Block传输服务 **
Spark存储体系——块管理器BlockManager **
Spark MapOutputTracker浅析 **
shuffle服务与客户端 **

能力有限,目前还是个学习者的姿态,所以只是记录一下spark存储模块源码的学习过程。在学习的过程中发现上面几个是不错的学习资料,推荐给大家,带*号表示值得优先查看学习的资料。

因为每个资料都各有侧重点,所以可能在看的时候对一些没有细讲的类和架构不了解。下面我对上面的资料进行了总结,可以在学习的过程中对照的来看。

重要概念

  • BroadcastManager:广播管理器,在SparkEnv中是直接初始化
  • TorrentBroadcast:广播变量类
  • BroadcastFactory:广播变量工厂类,是Spark中broadcast中所有实现的接口。SparkContext使用BroadcastFactory实现来为整个Spark job实例化特定的broadcast。它有唯一子类 -- TorrentBroadcastFactory。
  • TorrentBroadcastFactory:其newBroadcast方法实例化TorrentBroadcast类
  • SerializerManager: 它是为各种Spark组件配置序列化,压缩和加密的组件,包括自动选择用于shuffle的Serializer。spark中的数据在network IO 或 local disk IO传输过程中。都需要序列化。其默认的 Serializer 是 org.apache.spark.serializer.JavaSerializer,在一定条件下,可以使用kryo,即org.apache.spark.serializer.KryoSerializer。
  • BlockInfo:记录了block 的相关信息。
  • BlockData:BlockData只是定义了数据转化的规范,并没有涉及具体的存储格式和读写流程,实现起来比较自由,所以前面说它是个松散的特征。BlockData目前有3个实现类:基于内存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盘和File的DiskBlockData,以及加密的EncryptedBlockData
  • DiskBlockData:主要用于将磁盘中的block文件转换为指定的流或对象。
  • EncryptedBlockData:主要是用于加密的block磁盘文件转换为特定的流或对象。
  • ByteBufferBlockData:主要用于内存中的block数据转换为指定的流或对象。
  • BlockInfoManager:block在读写时有锁机制,并且委托给BlockInfoManager来管理。虽然BlockInfoManager的字面意思是“块信息管理器”,但管理块信息的意图并不明显,管理块的锁才是真正主要的任务。这个类公开的锁定接口是readers-writer锁。每次获取锁都会自动与正在运行的任务关联,并在任务完成或失败时自动释放锁。这个类是线程安全的。
    • lockForReading():为一个块加读锁
    • lockForWriting():为一个块加写锁
    • unlock():释放单个锁
    • releaseAllLocksForTask():释放当前TaskAttemptId对应的所有锁,并返回所有块ID的序列
    • downgradeLock():锁降级
    • removeBlock():从infos映射中删掉对应的BlockInfo,同时释放它对应的所有锁。
  • MemoryManager:管理Spark在JVM中的总体内存使用情况。该组件实现了跨任务划分可用内存以及在存储(内存使用缓存和数据传输)和执行(计算使用的内存,如shuffle,连接,排序和聚合)之间分配内存的策略。Spark环境中的每个JVM实例都会持有一个MemoryManager
  • TaskMemoryManager:TaskMemoryManager管理由各个任务分配的内存。任务与TaskMemoryManager交互,永远不会直接与JVM范围的MemoryManager交互。
  • MemoryPool:MemoryPool抽象类从逻辑上非常松散地定义了Spark内存池的一些基本约定。它负责管理可调大小的内存区域的簿记工作。可以这样理解,内存就是一个金库,它是一个负责记账的管家,主要负责记录内存的借出归还。这个类专门为MemoryManager而设计。给内存记账,其实从本质上来说,它不是Spark内存管理部分的核心功能,但是又很重要,它的核心方法都是被MemoryManager来调用的。理解了这个类,其子类就比较好理解了。记账的管家有两种实现,分别是StorageMemoryPoolExecutionMemoryPool
  • StorageMemoryPool:通过记账的方式,管理用于存储的可调整大小的内纯池。
    • acquireMemory():获取N个字节的内存给指定的block,如果有必要,即内存不够用了,可以将其他的从内存中驱除。
    • releaseMemory():释放内存
    • freeSpaceToShrinkPool():缩小此存储内存池可用空间spaceToFree字节的大小。这个方法是在收缩存储内存池之前调用的,因为这个方法返回值是要收缩的值。收缩存储内存池是为了扩大执行内存池,即这个方法是在收缩存储内存,扩大执行内存时用的,这个方法只是为了缩小存储内存池作准备的,并没有真正的缩小存储内存池。
  • ExecutionMemoryPool:实现策略和记帐,以便在任务之间共享大小可调的内存池,用于管理执行内存池。
    • memoryUsed():获取总的任务内存使用大小
    • getMemoryUsageForTask():获取某一任务内存使用大小
    • acquireMemory():给一个任务分配内存
    • releaseMemory():释放当前任务指定大小的内存空间
    • releaseAllMemoryForTask():释放当前任务已经使用的所有内存空间
  • MemoryManager:一种抽象内存管理器,用于管理Execution和Storage之间共享内存的方式。在这个上下文下,Execution内存是指用于在shuffle,join,sort和aggregation中进行计算的内存,而Storage内存是指用于在群集中缓存和传播内部数据的内存。 每个JVM都有一个MemoryManager。
    • maxOnHeapStorageMemory()
    • maxOffHeapStorageMemory():获取存储池最大使用内存,抽象方法,待子类实现
    • executionMemoryUsed()
    • storageMemoryUsed()
    • getExecutionMemoryUsageForTask():获取已使用内存
    • acquireStorageMemory()
    • acquireExecutionMemory()
    • acquireUnrollMemory():获取内存,抽象方法,待子类实现
    • releaseExecutionMemory()
    • releaseAllExecutionMemoryForTask()
    • releaseStorageMemory()
    • releaseAllStorageMemory()
    • releaseUnrollMemory():释放内存,这些请求都委托给对应的MemoryPool来做。什么是Unroll内存呢?RDD在被缓存之前,它所占用的内存空间是不连续的,而被缓存到存储内存之后,就以块的形式来存储,占用连续的内存空间了。Unroll就是这个将RDD固化在连续内存空间的过程,中文一般翻译为“展开”。Unroll过程使用的内存空间就是展开内存,它本质上是存储内存中比较特殊的一部分。
  • StaticMemoryManager:spark 1.6 之前 使用MemoryManager子类 StaticMemoryManager 来做内存管理。静态内存管理中的执行池和存储池之间有严格的界限,两个池的大小永不改变。1.6后如果想使用这个内存管理方式,设置 spark.memory.useLegacyMode 为 true即可(默认是false)。
  • UnifiedMemoryManager:spark1。6之后使用的默认类。这个MemoryManager保证了存储池和执行池之间的软边界,即可以互相借用内存来满足彼此动态的内存需求变化。执行和存储的占比由 spark.memory.storageFraction 配置,默认是0.6,即偏向于存储池。其中存储池的默认占比是由 spark.memory.storageFraction 参数决定,默认是 0.5 ,即 存储池默认占比 = 0.6 * 0.5 = 0.3 ,即存储池默认占比为0.3。存储池可以尽可能多的向执行池借用空闲内存。但是当执行池需要它的内存的时候,会把一部分内存池的内存对象从内存中驱逐出,直到满足执行池的内存需求。类似地,执行池也可以尽可能地借用存储池中的空闲内存,不同的是,执行内存不会被存储池驱逐出内存,也就是说,缓存block时可能会因为执行池占用了大量的内存池不能释放导致缓存block失败,在这种情况下,新的block会根据StorageLevel做相应处理。
  • MemoryEntry:本质上就是内存中一个block,指向了存储在内存中的真实数据。或者说是块在内存中的抽象表示。
  • MemoryStore:执行将Block存储在内存中的类,可以是反序列化Java对象的数组,也可以是由ByteBuffers序列化的。
    • putBytes():直接写入数据。先从MemoryManager中申请内存,如果申请成功,则调用回调方法 _bytes 获取ChunkedByteBuffer数据,然后封装成 SerializedMemoryEntry对象 ,最后将封装好的SerializedMemoryEntry对象缓存到 entries中。
    • putIteratorAsValues():把迭代器中值保存为内存中的Java对象
    • putIteratorAsBytes():把迭代器中值保存为内存中的序列化字节数据。所谓迭代器化的数据,就是指用Iterator[T]形式表示的块数据。之所以会这样表示,是因为有时单个块对应的数据可能过大,不能一次性存入内存。为了避免造成OOM,就可以一边遍历迭代器,一边周期性地写内存,并检查内存是否够用,就像翻书一样。“展开”(Unroll)这个词形象地说明了该过程
    • reserveUnrollMemoryForThisTask():申请展开内存的方法。思路大致上是先从MemoryManager 申请摊开内存,若成功,则根据memoryMode在堆内或堆外记录摊开内存的map上记录新分配的内存。
    • releaseUnrollMemoryForThisTask():释放展开内存的方法。先根据memoryMode获取到对应记录堆内或堆外内存的使用情况的map,然后在该task的摊开内存上减去这笔内存开销,如果减完之后,task使用内存为0,则直接从map中移除对该task的内存记录。
    • evictBlocksToFreeSpace():尝试驱逐block来释放指定大小的内存空间来存储给定的block,用途为淘汰现有的一些块,为新的块腾出空间。
  • DiskBlockManager:负责维护块数据与其在磁盘上存储位置的关系,是用来创建并维护逻辑block和落地后的block文件的映射关系的,它还负责创建用于shuffle或本地的临时文件。
    • createLocalDirs():创建本地存储目录
    • getFile():创建子目录及创建File对象
    • getAllFiles():获取所有文件
    • getAllBlocks():获取所有块ID
    • createTempLocalBlock()
    • createTempShuffleBlock():用来创建Spark计算过程中的中间结果以及Shuffle Write阶段输出的存储文件。它们的块ID分别用TempLocalBlockId和TempShuffleBlockId来表示。
    • DiskBlockManager.addShutdownHook()/doStop():绑定关闭钩子与关闭。如果deleteFilesOnStop标记为真,则在DiskBlockManager关闭之前,会调用Utils.deleteRecursively()方法递归地删掉本地存储目录。deleteFilesOnStop 通过构造方法传入
  • DiskStore:真正负责磁盘存储的组件,用来保存block 到磁盘的。
    • putBytes():写入字节,将数据写入到磁盘中。
    • getBytes():读取字节,getBytes获取的是BlockData数据,注意现在只是返回文件的引用,文件的内容并没有返回。
  • BlockManagerMaster:BlockManagerMaster 这个类是对 driver的 EndpointRef 的包装,可以说是 driver EndpointRef的一个代理类,主要负责和driver的交互,来获取跟底层存储相关的信息。BlockManager是典型的主从架构设计,不管Driver还是Executor上都要有BlockManager实例,那么必然就得存在一个协调组件——Spark中就是BlockManagerMaster了。BlockManagerMaster服务取名为Master其实是一个挺迷糊的名称;虽然它是Master,但是该对象并不是BlockManager的分布式服务的Master节点;而只是对Master节点一个连接符, 通过该连接符,从而已可以和真正的Master节点进行通信;不管是在Driver还是在Executor上,都有一个BlockManagerMaster.真正的Master节点是BlockManagerMasterEndpoint这个对象。
    • removeExecutor()
    • removeExecutorAsync():移除executor,有同步和异步两种方案,这两个方法只会在driver端使用。
    • registerBlockManager():向driver注册blockmanager
  • BlockManagerMasterEndpoint:其在以前的版本中也叫BlockManageMasterActor。BlockManageMasterEndpoint只存在于Driver上,Executor在BlockManageMaster中获取BlockManageMasterEndpoint的引用,并向其发送消息(使用ask、askSync、tell),实现和Driver的交互。
    • receiveAndReply():接受并回复RPC消息,通过覆写RpcEndpoint.receiveAndReply()方法来实现。
    • register():处理BlockManager注册
    • heartbeatReceived():处理BlockManager心跳
  • BlockManagerSlaveEndpoint:SlaveEndpoint配合BlockManage执行一些来自于driver和executor的要求操作(通过BlockManageMaster),其主体函数同样是receiveAndReply,不过内部执行操作的选项较少,主要包括去除Block、RDD、Broadcast,获取信息等操作,所有匹配后的具体操作都是通过相应的具体类(如BlockManage、shuffleManager等)完成。
    • receiveAndReply():接受并回复RPC消息,通过覆写RpcEndpoint.receiveAndReply()方法来实现。
  • BlockManager:块管理器BlockManager会运行在Spark集群中的所有节点上。每个节点上的BlockManager通过MemoryManager、MemoryStore、DiskBlockManager、DiskStore来管理其内存、磁盘中的块,并与其他节点进行块的交互,是一个规模庞大的组件。
    • initialize():1. 初始化BlockTransferService和ShuffleClient。2. 根据配置项spark.storage.replication.policy确定块复制策略并通过反射创建。默认值为RandomBlockReplicationPolicy,说明是将块的副本随机放到不同的节点上。3. 根据Executor ID生成BlockManagerId,并调用BlockManagerMaster.registerBlockManager()方法注册此ID与从RPC端点。注册成功后,BlockManagerMaster会返回另一个正式的ID。4. 生成Shuffle服务的ID。如果当前节点是Executor并启用了外部Shuffle服务的话,就调用registerWithExternalShuffleServer()方法注册外部Shuffle服务
    • getOrElseUpdate():用于获取Block。如果Block存在,则获取此Block并返回BlockResult,否则调用makeIterator方法计算Block,并持久化后返回BlockResult或Iterator
    • reregister():重新向driver注册blockManager方法
    • get():在getOrElseUpdate方法中被调用,该方法先调用getLocalValues()方法从本地(注意是本地Executor)读取数据,如果读取不到,就继续调用getRemoteValues()方法从远端获取数据。
    • getLocalBytes():用于存储体系获取BlockId所对应Block的数据,并封装为ChunkedByteBuffer后返回
    • getBlockData():用于获取本地Block的数据。
    • putBlockData():用于将Block数据写入本地
    • getLocalValues():用于从本地的BlockManager中获取Block数据
    • 该类方法非常繁多,可查看Spark存储体系——块管理器BlockManager文章,主要概括为读取序列化数据,读取对象数据,写入序列化,写入对象,从本地读取,从远程读取等。
  • MapOutputTracker: MapOutputTracker 是一个定位跟踪 stage 的map 输出位置的类,driver 和 executor 有对应的实现,分别是 MapOutputTrackerMaster 和 MapOutputTrackerWorker。
  • ShuffleClient:不仅是将shuffle文件上传到其他Executor或者下载远程Executor文件到本地的客户端,也是提供可以被其他Executor访问的shuffle服务。
  • BlockTransferService:是继承自ShuffleClient接口的抽象类,负责数据的传输。其中定义了blocks的批量获取、单个获取和单个同步或异步上传的接口。
    • init():初始化,在BlockManager的initialize()方法中被调用
    • uploadBlock():上传单个block块到远程节点,仅在[[init]]之后才可使用。
    • fetchBlocks():从远程节点异步获取一组blocks。注意:该API接口接收数组接口,所以可以实现批量请求。另外,该方法没有返回一个future对象,所以子类实现可以在一个block获取成功后立即回调onBlockFetchSuccess,而不是等待所有的blocks都获取成功。
    • uploadBlockSync():上传单个block块到远程节点,仅在[[init]]调用之后才可用。该方法类似于[[uploadBlock]]方法,除了该方法会阻塞线程直到block上传完成,也就是同步上传。
    • fetchBlockSync():一个特殊的[[fetchBlocks]]的例子,它阻塞式地读取一个block块,也就是同步读取。只有在调用[[init]]后才可以使用它。
  • NettyBlockTransferService:BlockTransferService的实现类实现为NettyBlockTransferService,它使用Netty异步时间驱动的网络应用框架,获取和上传远程节点上的Block集合。

重要图例

广播数据的读取流程图

TaskMemoryManager被创建和使用流程图

静态内存管理StaticMemoryManager布局图解

统一内存管理UnifiedMemoryManager布局图解

BlockManagerMaster与RPC端点间的关系

BlockManagerMaster的名字有些许误导性:实际上在每个节点都会有一个BlockManagerMaster,而不是Driver上有BlockManagerMaster,Executor上有BlockManagerSlave(当然它是不存在的)。BlockManager的主从则是靠RPC端点体系来体现的。之所以叫这个名字,可能是为了避免出现“块管理器管理器”(BlockManagerManager)这样更奇怪的名字吧。

BlockManager的读写流程(不含BlockTransferService)