Task粒度的缓存聚合排序结构AppendOnlyMap详细剖析-Spark商业环境实战

1,471 阅读6分钟

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

Spark商业环境实战及调优进阶系列

1. AppendOnlyMap 何许人也?

Spark提供了AppendOnlyMap数据结构来对任务执行结果进行聚合运算。可谓一件利器,为什么这样说呢?因为Spark是基于内存运算的大数据计算引擎,即能基于内存做数据存储,也能基于内存进行插入,更新,聚合,排序等操作。由此可以看出,Spark真正把内存的使用技术发挥到了极致。

    * A simple open hash table optimized for the append-only use case, where keys
    * are never removed, but the value for each key may be changed.
    *
    * This implementation uses quadratic probing with a power-of-2 hash table
    * size, which is guaranteed to explore all spaces for each key (see
    * http://en.wikipedia.org/wiki/Quadratic_probing).
    * The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.

1.1 AppendOnlyMap 内部成员及特殊使命

  • 提供对null值得缓存

  • initialCapacity : 主构造函数传入 class AppendOnlyMap[K, V](initialCapacity: Int = 64)

  • capacity :容量取值为:nextPowerOf2(initialCapacity),具体就是补零对比,相同为原值,不相同则左移加一位。

  • data : 用于保存key和聚合值得数组。new Array[AnyRef](2 * capacity)

      * Holds keys and values in the same array for memory locality;
      * specifically, the order of elements is key0, value0, key1, value1, 
      * key2, value2, etc. 
    
  • LOAD_FACTOR :默认为0.7

  • growThreshold : (LOAD_FACTOR * capacity).toInt

1.2 AppendOnlyMap 常用方法:

  • growTable :扩容容量为原先的两倍,对key进行re-hash放入新数组。Double the table's size and re-hash everything。

  • update :key和value的更新。三种情况:1:rehash(key.hashCode) & mask对应位置没有值,直接插入。2:对应位置有值且等于原先key,直接更新。3:对应位置有值且 不等于原先key,向后挪动一位。

    def update(key: K, value: V): Unit = {
       assert(!destroyed, destructionMessage)
       val k = key.asInstanceOf[AnyRef]
       if (k.eq(null)) {
         if (!haveNullValue) {
           incrementSize()
         }
         nullValue = value
         haveNullValue = true
         return
        }
        var pos = rehash(key.hashCode) & mask
        var i = 1
        while (true) {
         val curKey = data(2 * pos)
         if (curKey.eq(null)) {
           data(2 * pos) = k
           data(2 * pos + 1) = value.asInstanceOf[AnyRef]
           incrementSize()  // Since we added a new key
           return
         } else if (k.eq(curKey) || k.equals(curKey)) {
           data(2 * pos + 1) = value.asInstanceOf[AnyRef]
           return
         } else {
           val delta = i
           pos = (pos + delta) & mask
           i += 1
         }
       }
     }
    
  • changeValue :缓存聚合算法,根据指定函数进行值的聚合操作,updateFunc为匿名函数。三种情况:1:rehash(key.hashCode) & mask对应位置没有值,与NULL值聚合。2:对应位置有值且等于原先key,直接聚合。3:对应位置有值,且不等于原先key,向后挪动一位插入。

     def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
       assert(!destroyed, destructionMessage)
       val k = key.asInstanceOf[AnyRef]
       if (k.eq(null)) {
         if (!haveNullValue) {
           incrementSize()
         }
         nullValue = updateFunc(haveNullValue, nullValue)
         haveNullValue = true
         return nullValue
       }
       var pos = rehash(k.hashCode) & mask
       var i = 1
       while (true) {
         val curKey = data(2 * pos)
         if (curKey.eq(null)) {
           val newValue = updateFunc(false, null.asInstanceOf[V])
           data(2 * pos) = k
           data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
           incrementSize()
           return newValue
         } else if (k.eq(curKey) || k.equals(curKey)) {
           val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
           data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
           return newValue
         } else {
           val delta = i
           pos = (pos + delta) & mask
           i += 1
         }
       }
       null.asInstanceOf[V] // Never reached but needed to keep compiler happy
     }
    
  • destructiveSortedIterator:在不牺牲额外内存和不牺牲AppendOnlyMap的有效性的前提下,对AppendOnlyMap的data数组中的数据进行排序实现。这里使用了优化版的TimSort,英文解释如下:

      * return an iterator of the map in sorted order. This provides a way to sort the
      * map without using additional memory, at the expense of destroying the validity
      * of the map.
    

    代码片段如下:

       def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
          destroyed = true
          // Pack KV pairs into the front of the underlying array
           var keyIndex, newIndex = 0
           while (keyIndex < capacity) {
            if (data(2 * keyIndex) != null) {
              data(2 * newIndex) = data(2 * keyIndex)
              data(2 * newIndex + 1) = data(2 * keyIndex + 1)
              newIndex += 1
            }
            keyIndex += 1
          }
          assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
          new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
          new Iterator[(K, V)] {
            var i = 0
            var nullValueReady = haveNullValue
            def hasNext: Boolean = (i < newIndex || nullValueReady)
            def next(): (K, V) = {
              if (nullValueReady) {
                nullValueReady = false
                (null.asInstanceOf[K], nullValue)
              } else {
                val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
                i += 1
                item
              }
            }
          }
        }   
    

2. AppendOnlyMap 孩子的延伸特性?

  • SizeTrackingAppendOnlyMap :以自身的大小进行样本采集和大小估算。

      An append-only map that keeps track of its estimated size in bytes.
    

    SizeTrackingAppendOnlyMap的代码段,好短啊:

      private[spark] class SizeTrackingAppendOnlyMap[K, V]
        extends AppendOnlyMap[K, V] with SizeTracker
      {
        override def update(key: K, value: V): Unit = {
          super.update(key, value)
          super.afterUpdate()
        }
      
        override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
          val newValue = super.changeValue(key, updateFunc)
          super.afterUpdate()
          newValue
        }
      
        override protected def growTable(): Unit = {
          super.growTable()
          resetSamples()
        }
      }
    
  • PartitionedAppendOnlyMap :增加了partitionedDestructiveSortedIterator,调用了AppendOnlyMap的destructiveSortedIterator对底层数组进行整理和排序后获得迭代器。

    (1)主要作用是根据指定的key比较器,返回对集合中的数据按照分区Id 顺序进行迭代的迭代器。
      * Implementation of WritablePartitionedPairCollection that wraps a map in which the
      * keys are tuples of (partition ID, K)
    
      private[spark] class PartitionedAppendOnlyMap[K, V]
        extends SizeTrackingAppendOnlyMap[(Int, K), V] with
        WritablePartitionedPairCollection[K, V] {
      
            (WritablePartitionedPairCollection定义的接口,未实现)
        def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
          : Iterator[((Int, K), V)] = {
          val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
         
         (AppendOnlyMap内部方法,对底层的data数组进行整理和排序后获得迭代器)
            destructiveSortedIterator(comparator)
        }
    
    (2) 同时对插入的键值进行了扩展,增加了分区和key的键值对元祖(partition, key)类型,如下:update((partition, key), value)。
        def insert(partition: Int, key: K, value: V): Unit = {
          update((partition, key), value)
        }
      }
    

3 原创总结AppendOnlyMap牛在哪里?

  • 1 大大减少了数据占用内存的大小?
  • 2 来对中间结果进行聚合,Tim sort 优化排序算法
  • 3 Spark的Map任务逐条输出计算结果,而不是一次性输出到内存,并通过使用AppendOnlyMap缓存及其聚合算法来对中间结果进行聚合,这样大大减少了中间结果所占用的内存。
  • 4 Spark的reduce任务对拉取到的map任务中间结果逐条读取,而不是一次性读入内存,并在内存中进行聚合和排序, 本质上读入内存操作都是经过AppendOnlyMap,大大减少了数据占用内存的大小。
  • 5 通过优化的SizeTrackingAppendOnlyMap,SizeTrackingPairBuff及Tungsten的page进行溢出判断,当超过限制时,会把数据写入磁盘,放着内存溢出。

4 Spark shuffle 的流程剖析

- 1: Spark的Map任务在输出时会根据分区进行计算,并输出数据文件和索引文件。
- 2:Spark的shuffle过程会伴随着缓存,排序,聚合,溢出,合并操作。当然远端拉取Block的操作必不可少。

5 最后

秦凯新 于深圳