Hadoop学习——MapReduce

605 阅读5分钟

其他更多java基础文章:
java基础学习(目录)


学习资料:
详细讲解MapReduce过程(整理补充)

MapReduce原理


1.在客户端启动一个作业。

2.向JobTracker请求一个Job ID。

3.将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。

4.JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化。

5.TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。

MapReduce 任务中 Shuffle 和排序的过程

流程分析:

  • Map 端:
  1. 每个输入分片会让一个 map 任务来处理,默认情况下,以 HDFS 的一个块的大小(默认为 64M)为一个分片,当然我们也可以设置块的大小。 map 输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为 100M,由 io.sort.mb 属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的 80%,由 io.sort.spill.percent 属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

  2. 在写入磁盘之前,线程首先根据 reduce 任务的数目将数据划分为相同数目的分区,也就是一个 reduce 任务对应一个分区的数据。分区就是给数据打一个标签,让它被某个固定的 reduce 执行,这样做是为了避免有些 reduce 任务分配到大量数据,而有些reduce 任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进 行 hash 的过程。然后对每个分区中的数据进行排序,如果此时设置了 Combiner,将排序后的结果进行 combine 操作,这样做的目的是让尽可能少的数据写入到磁盘。

  3. 当 map 任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和 combine 操作,目的有两个: 1)尽量减少每次写入磁盘的数据量;2)尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out 设置为 true 就可以了。

  4. 将分区中的数据拷贝给相对应的 reduce 任务。分区中的数据怎么知道它对应的 reduce 是哪个呢?其实 map 任务一直和其父进程 TaskTracker 保持联系,而 TaskTracker 又一 直和 JobTracker 保持心跳。所以 JobTracker 中保存了整个集群中的宏观信息。只要 reduce 任务向 JobTracker 获取对应的 map 输出位置就 ok 了哦。 到这里, map 端就分析完了。那到底什么是 Shuffle 呢? Shuffle 的中文意思是“洗 牌”,一个 map 产生的数据,结果通过 hash 过程分区却分配给了不同的 reduce 任务,就 是一个对数据洗牌的过程。

  • Reduce 端:
  1. 每个map任务的完成时间可能不同,因此在每个任务完成时,reduce任务就开始复制其输出。这就是reduece任务的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但是这个默认值可以修改设置mapreduce.reduce.shuffle属性即可。

  2. Reduce 会接收到不同 map 任务传来的数据,并且每个 map 传来的数据都是有序的。如果 reduce 端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent 属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由 mapred.job.shuffle.merge.percent 决定),则对数据合并后溢写到磁盘中。

  3. 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在 map 端还是 reduce 端, MapReduce 都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是 hadoop 的灵 魂。

  4. 合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 reduce函数。合并的时候