本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。
Spark商业环境实战及调优进阶系列
- Spark商业环境实战-Spark内置框架rpc通讯机制及RpcEnv基础设施
- Spark商业环境实战-Spark事件监听总线流程分析
- Spark商业环境实战-Spark存储体系底层架构剖析
- Spark商业环境实战-Spark底层多个MessageLoop循环线程执行流程分析
- Spark商业环境实战-Spark二级调度系统Stage划分算法和最佳任务调度细节剖析
- Spark商业环境实战-Spark任务延迟调度及调度池Pool架构剖析
- Spark商业环境实战-Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析
- Spark商业环境实战-ExternalSorter 排序器在Spark Shuffle过程中设计思路剖析
- Spark商业环境实战-StreamingContext启动流程及Dtream 模板源码剖析
- Spark商业环境实战-ReceiverTracker与BlockGenerator数据流接收过程剖析
1. AppendOnlyMap 何许人也?
Spark提供了AppendOnlyMap数据结构来对任务执行结果进行聚合运算。可谓一件利器,为什么这样说呢?因为Spark是基于内存运算的大数据计算引擎,即能基于内存做数据存储,也能基于内存进行插入,更新,聚合,排序等操作。由此可以看出,Spark真正把内存的使用技术发挥到了极致。
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
*
* This implementation uses quadratic probing with a power-of-2 hash table
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
* The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
1.1 AppendOnlyMap 内部成员及特殊使命
-
提供对null值得缓存
-
initialCapacity : 主构造函数传入 class AppendOnlyMap[K, V](initialCapacity: Int = 64)
-
capacity :容量取值为:nextPowerOf2(initialCapacity),具体就是补零对比,相同为原值,不相同则左移加一位。
-
data : 用于保存key和聚合值得数组。new Array[AnyRef](2 * capacity)
* Holds keys and values in the same array for memory locality; * specifically, the order of elements is key0, value0, key1, value1, * key2, value2, etc.
-
LOAD_FACTOR :默认为0.7
-
growThreshold : (LOAD_FACTOR * capacity).toInt
1.2 AppendOnlyMap 常用方法:
-
growTable :扩容容量为原先的两倍,对key进行re-hash放入新数组。Double the table's size and re-hash everything。
-
update :key和value的更新。三种情况:1:rehash(key.hashCode) & mask对应位置没有值,直接插入。2:对应位置有值且等于原先key,直接更新。3:对应位置有值且 不等于原先key,向后挪动一位。
def update(key: K, value: V): Unit = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { incrementSize() } nullValue = value haveNullValue = true return } var pos = rehash(key.hashCode) & mask var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) { data(2 * pos) = k data(2 * pos + 1) = value.asInstanceOf[AnyRef] incrementSize() // Since we added a new key return } else if (k.eq(curKey) || k.equals(curKey)) { data(2 * pos + 1) = value.asInstanceOf[AnyRef] return } else { val delta = i pos = (pos + delta) & mask i += 1 } } }
-
changeValue :缓存聚合算法,根据指定函数进行值的聚合操作,updateFunc为匿名函数。三种情况:1:rehash(key.hashCode) & mask对应位置没有值,与NULL值聚合。2:对应位置有值且等于原先key,直接聚合。3:对应位置有值,且不等于原先key,向后挪动一位插入。
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) haveNullValue = true return nullValue } var pos = rehash(k.hashCode) & mask var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) { val newValue = updateFunc(false, null.asInstanceOf[V]) data(2 * pos) = k data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] incrementSize() return newValue } else if (k.eq(curKey) || k.equals(curKey)) { val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue } else { val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
-
destructiveSortedIterator:在不牺牲额外内存和不牺牲AppendOnlyMap的有效性的前提下,对AppendOnlyMap的data数组中的数据进行排序实现。这里使用了优化版的TimSort,英文解释如下:
* return an iterator of the map in sorted order. This provides a way to sort the * map without using additional memory, at the expense of destroying the validity * of the map.
代码片段如下:
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = { destroyed = true // Pack KV pairs into the front of the underlying array var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(2 * newIndex) = data(2 * keyIndex) data(2 * newIndex + 1) = data(2 * keyIndex + 1) newIndex += 1 } keyIndex += 1 } assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator) new Iterator[(K, V)] { var i = 0 var nullValueReady = haveNullValue def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { if (nullValueReady) { nullValueReady = false (null.asInstanceOf[K], nullValue) } else { val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V]) i += 1 item } } } }
2. AppendOnlyMap 孩子的延伸特性?
-
SizeTrackingAppendOnlyMap :以自身的大小进行样本采集和大小估算。
An append-only map that keeps track of its estimated size in bytes.
SizeTrackingAppendOnlyMap的代码段,好短啊:
private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker { override def update(key: K, value: V): Unit = { super.update(key, value) super.afterUpdate() } override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { val newValue = super.changeValue(key, updateFunc) super.afterUpdate() newValue } override protected def growTable(): Unit = { super.growTable() resetSamples() } }
-
PartitionedAppendOnlyMap :增加了partitionedDestructiveSortedIterator,调用了AppendOnlyMap的destructiveSortedIterator对底层数组进行整理和排序后获得迭代器。
(1)主要作用是根据指定的key比较器,返回对集合中的数据按照分区Id 顺序进行迭代的迭代器。
* Implementation of WritablePartitionedPairCollection that wraps a map in which the * keys are tuples of (partition ID, K) private[spark] class PartitionedAppendOnlyMap[K, V] extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] { (WritablePartitionedPairCollection定义的接口,未实现) def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) (AppendOnlyMap内部方法,对底层的data数组进行整理和排序后获得迭代器) destructiveSortedIterator(comparator) }
(2) 同时对插入的键值进行了扩展,增加了分区和key的键值对元祖(partition, key)类型,如下:update((partition, key), value)。
def insert(partition: Int, key: K, value: V): Unit = { update((partition, key), value) } }
3 原创总结AppendOnlyMap牛在哪里?
- 1 大大减少了数据占用内存的大小?
- 2 来对中间结果进行聚合,Tim sort 优化排序算法
- 3 Spark的Map任务逐条输出计算结果,而不是一次性输出到内存,并通过使用AppendOnlyMap缓存及其聚合算法来对中间结果进行聚合,这样大大减少了中间结果所占用的内存。
- 4 Spark的reduce任务对拉取到的map任务中间结果逐条读取,而不是一次性读入内存,并在内存中进行聚合和排序, 本质上读入内存操作都是经过AppendOnlyMap,大大减少了数据占用内存的大小。
- 5 通过优化的SizeTrackingAppendOnlyMap,SizeTrackingPairBuff及Tungsten的page进行溢出判断,当超过限制时,会把数据写入磁盘,放着内存溢出。
4 Spark shuffle 的流程剖析
- 1: Spark的Map任务在输出时会根据分区进行计算,并输出数据文件和索引文件。
- 2:Spark的shuffle过程会伴随着缓存,排序,聚合,溢出,合并操作。当然远端拉取Block的操作必不可少。
5 最后
秦凯新 于深圳