MapReduce 工作流程

1,708 阅读2分钟

MapReduce 工作流程

以示例程序 wordcount为例

image.png

Map

InputFormat

InputFormat会将我们指定的输入路径中的文件按照block(默认 128M)逻辑切分成若干切片(split,如果文件不足 128M 则单独为一个切片,如果满了128M但是不满128M*1.1也单独为一个切片),然后交给RecordReader进行处理,产出若干key/value record

RecordReader

产出的key/value record会暂存在内存中的一块环形缓冲区中(逻辑上成环形),写入record时会从环形上的两个位置写入,一个位置写入record,一个位置写入record的索引inde,这样做的好处是:要想在环上找到一个record不用遍历数据量较大的record序列,而只用遍历数据量较小的index列表。

Shuffle

Ring Buffer & Excessive Writing & Combine

MapTask会根据输入的大数据源源不断的产出record,而环形缓冲的大小是有限的(假设是100M,此参数可配置),当环形缓冲的占用量达到80%(此参数可配置)时,就会对这80%record进行一个全排序(准确的说是二次排序,先按照partition有序(见Partitioner),再按照recordkey有序),如果你设置了CombinerClass那么同时会对record进行一个合并,最后写入磁盘(此过程称为溢写),形成一个首先分区号有序其次key有序的record序列;而剩下的20%则继续迎接后续写入的record

Merge Sort

这样输入的数据集就会分批次写入到硬盘中,形成多个批次内有序的record序列,然后再从硬盘中逐批读出这些序列进行一个归并排序(归并的过程中又可以应用Combiner做一个合并处理),最终产出该MapTask对应的分区号有序、同一分区内record.key有序的record序列,即将流入Reducer

Reducer

setNumReduceTask

MapTask的数量是由切片规则来决定的,输入的数据集会被切成多少片就会有多少个MapTask,每个MapTask都会产出一个分区号有序的record序列,而ReducerTask是在Driver中通过setNumReducerTask手动指定的,一般会和Partitioner返回的分区号(返回0则会由ReduceTask1处理并产出到part-r-0000)类别数保持一致。

ReduceTask

ReduceTask会从所有的MapTask的产出中抓取出分区号和自己对应的record过来,例如上图中ReduceTask1会分别抓取MapTask1MapTask2产出的record序列中分区号为0的部分,进行一个归并排序(过程中使用GroupingComparator进行分组,结果对应Reducer#reduce方法入参的Iterable values)并将结果序列中的元素(Object key,Iterable values)逐个交给Reducer#reduce进行处理,可以通过context.write写入到output对应分区号的part-r-000x中。