再谈Shuffle(Spark&MR)

1,149 阅读4分钟

MR shuffle

捋一下MR的shuffle【阅读Hadoop权威指南】:

MapTask

Map的输出并不是简单的直接写到磁盘,先写到缓冲区,当缓冲区要spill磁盘的时候对缓冲区内容进行排序。

每个MapTask(计算一个split)都有一个环形缓冲区(默认100MB,这是一个调优点,不过应该再也不写MR了),当缓冲区达到阈值(80%,这也是个可调优的地方),有一个后台线程开始负责将缓冲区内容spill磁盘。

注意这里就是并行执行了,一边在一直往缓冲区写数据,同时另一个线程负责往磁盘写。当溢写线程没来得及写时,缓冲区被打满了,这时候写缓冲区的线程会被阻塞。

从缓冲区开始向磁盘写之前,会把数据根据key进行分区,会根据key进行排序,这时候会调用分区器,分区器可以自己实现,排好序后,如果有combiner,会对同组数据进行聚合,调优点。减少了磁盘和网络IO。每个分区会写出一个文件,比如有4个分区,就会写4个文件到磁盘上。

最后会对这些小文件合并成一个文件,此时这个文件的特点就是分区有序,且分区内key有序

总结一下,MapTask输出,每次缓冲区发生一次溢写会在磁盘上出现一个合并后的文件。权威指南上说到,当至少存在3个溢写文件时,会再次进行combiner,压缩数据。这个也是有参数调的mapreduce.map.combine.minspills,当不足3个时,表明map输出不多,也不值得调用一次combiner。Map的输出默认是不进行压缩的,可以将压缩打开并且指定合适的压缩格式。

ReduceTask

reducer通过网络拉去map的输出。mapreduce.shuffle.max.threads决定开多少线程去拉。默认值是0,代表使用cpu core数量的两倍。这些参数就不再提了,真的在生产上调过,绝对刻骨铭心。

reduce是如何知道map输出的文件在哪的呢,map任务完成后,会跟任务调度这AM进行通信,然后reduce要拉去的时候也是问AM,这样就知道该去哪里拿数据了。

reduce会把要计算的分区数据开始进行merge。因为map端已经做过排序(快速排序),这时候将他们进行归并排序,就得到了key有序的集合,将每组数据传给我们自己写的reducer的reduce方法进行相应的逻辑计算。

理清shuffle到底干了什么,就好调优了,瓶颈在IO上,内存如果给够,减少溢写磁盘的次数。

Spark

再回忆一下MR。在reduce中我们能拿到一组数据进行操作。这是因为MR的整个shuffle过程是对key进行排序了。开始我被别人灌输这种思想,上游排序节省下游的效率。其实仔细想想,reduce就是对相同的key做规约操作,也就是说相同的key要相遇

Spark的shuffle也是要按key进行排序的(SortShuffleWrite) 但是还提供了一种ByPass的方式是不需要排序的。这里注意思考

ShuffleWrite

SortShuffleWrite

map端计算结果写入缓冲区ExternalSorter

可以看出spark是用了hashmap来让相同的key相遇。

再结合这里的updateFunc看看

有了这个基础,接下来从聚合算子开始看起。

groupByKey

groupByKey底层调用的是combineByKey算子,这个算子很有用,有时候我们可以使用这个算子实现自己的一些特殊逻辑。再往底层创建的是ShuffledRDD

这三个函数什么时候用的是不是已经非常清楚了,当然除了第三个mergeCombiner,这个后面说。通过看这里能发现,groupByKey其实就是给每个key创建了一个缓冲区,把相同的key的value装进去。

这里源码注释中说到,groupbykey没有用map端聚合。仔细想想,如果开启确实额外增加了开销,这也算是spark做的一点优化。

reduceByKey

reduceByKey,底层也是combineByKey

不做过多的分析了,看懂前面的到这里已经很清晰了。

// todo ... shuffle write其实没有说完,先占个坑位。

ShuffleReader

看看reduce端是怎么接受数据的,做了哪些操作。 从这里看起

直接进入关键代码了
当开启了map端聚合,以reduceByKey为例,map端输出的结果已经是聚合过的,这时候reduce端会把多个map聚合后的结果,再聚合。这就用到了之前说的第三个函数。

当没有开启map端聚合,那么典型的就是groupByKey,所有的聚合逻辑都是发生在reduce端,在reduce端会使用那三个函数。

总结

其实就规约操作这块,MR是有排序的开销,而Spark是用了HashTable来让相同的key相遇。