Hadoop分布式计算架构流程分析-Hadoop商业环境实战

612 阅读9分钟

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。

1. Hadoop分布式计算架构流程分析

  • 1)在MapReduce程序读取文件的输入目录上存放相应的文件。
  • 2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
  • 3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
  • 4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
  • 5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
  • 6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算
  • 7)map()运算完毕后将KV对序列化后写到环形缓冲区,环形缓冲区默认是100M,写满80%后,会根据Hash(结合Reduce Task 的数量,不会超过Reduce Task的数量)溢写,溢写过程中使缓存中的KV对按照K分区排序后不断写到磁盘文件从而形成不同的分区,若设置了本地聚合,则会合并Combiner多个溢写文件为一个。
  • 9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数- 量的reducetask进程,并告知reducetask进程要处理的数据分区。
  • 10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
  • 11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。

2. Hadoop分布式计算流程图

  • 1)分布式的运算程序往往需要分成至少2个阶段。
  • 2)第一个阶段的maptask并发实例,完全并行运行,互不相干。
  • 3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。
  • 4)MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行。 如下图展示了这个MapReduce过程:

3 TDW计算引擎解析——Shuffle

3.1 MapReduce的Shuffle Collect 过程详解(下图Spill过程简化了)

MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:

  • Spill过程包括输出、排序、溢写、合并等步骤,如图所示,期间会产生很多溢出文件,需要最终归并排序

3.2 环形数据区Collect过程

  • 每个Map任务不断地以<key, value>对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

  • 这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了<key, value>数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。<key, value>数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,<key, value>数据的存储方向是向上增长,索引数据的存储方向是向下增长,

  • Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

  • 索引是对<key, value>在kvbuffer中的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个<key, value>写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个<key, value>和索引写完之后,Kvindex跳到-32位置。

  • 关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。

3.3 Sort过程(hadoop环形缓冲区排序,最终改变索引)

先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据以partition为单位聚集在一起,同一partition内的数据按照key有序。

3.4 Spill 过程(Spill包括输出、索引排序、溢写、合并等步骤,Spill过程会产生很多溢出文件)

  • Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把<key, value>数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。
  • 每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示

3.5 SortAndSpill如何一无既往地进行着数据输出

  • <key, value>只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。

3.6 Merge(多次Spill后,out文件和Index文件会产生很多,需要归并排序成一个segment文件)

  • 在之前Spill过程中的时候为什么不直接把这些索引信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。

  • Merge过程主要是针对一个个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的<key, value>输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。

3.7 Copy 和 Merge Sort (shuffle reduce 过程,缓存区大小是64K,超过后输出到磁盘)

  • Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。
  • Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。
  • 如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。
  • Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。

4 总结

秦凯新 于深圳