Hadoop学习(二)——MapReduce\Yarn架构

1,713 阅读15分钟

其他更多java基础文章:
java基础学习(目录)


学习资料
理解Hadoop YARN架构

本文先讲MapReduce 1.x的框架。再讲MapReduce 1.x升级改进后MapReduce 2.x/Yarn的框架。目前主要是用MapReduce 2.x/Yarn的框架。

MapReduce 1.x

MapReduce 1.x重点概念

JobClient

用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业运行状态。在Hadoop内部用“作业” (Job)表示MapReduce程序。每一个Job都会在用户端通过Client类将应用程序以及参数配置Configuration打包成Jar文件存储在HDFS,并把路径提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask),将它们分发到各个TaskTracker服务中去执行。

JobClient提交Job的详细流程主要如下:

JobClient在获取了JobTracker为Job分配的id之后,会在JobTracker的系统目录(HDFS)下为该Job创建一个单独的目录,目录的名字即是Job的id,该目录下会包含文件job.xml、job.jar、job.split等,其中,job.xml文件记录了Job的详细配置信息,job.jar保存了用户定义的关于job的map、reduce操纵,job.split保存了job任务的切分信息。

JobTracker

JobTracker 主要负责资源监控和作业调度。JobTracker 监控所有 TaskTracker 与作业Job的健康状况,一旦发现失败情况后,其会将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

以下引用 www.aboutyun.com/thread-7778…

JobTracker为作业的提交做了两件事:一.为作业生成一个Job;二.接受该作业。 我们都知道,客户端的JobClient把作业的所有相关信息都保存到了JobTracker的系统目录下(当然是HDFS了),这样做的一个最大的好处就是客户端干了它所能干的事情同时也减少了服务器端JobTracker的负载。下面就来看看JobTracker是如何来完成客户端作业的提交的吧!哦。对了,在这里我不得不提的是客户端的JobClient向JobTracker正式提交作业时直传给了它一个改作业的JobId,这是因为与Job相关的所有信息已经存在于JobTracker的系统目录下,JobTracker只要根据JobId就能得到这个Job目录。

对于上面的Job的提交处理流程,我将简单的介绍以下几个过程:

  1. 创建Job的JobInProgress
    JobInProgress对象详细的记录了Job的配置信息,以及它的执行情况,确切的来说应该是Job被分解的map、reduce任务。在JobInProgress对象的创建过程中,它主要干了两件事,一是把Job的job.xml、job.jar文件从Job目录copy到JobTracker的本地文件系统(job.xml->/jobTracker/jobid.xml,job.jar->/jobTracker/jobid.jar);二是创建JobStatus和Job的mapTask、reduceTask存队列来跟踪Job的状态信息。
  2. 检查客户端是否有权限提交Job
    JobTracker验证客户端是否有权限提交Job实际上是交给QueueManager来处理的。
  3. 检查当前mapreduce集群能够满足Job的内存需求
    客户端提交作业之前,会根据实际的应用情况配置作业任务的内存需求,同时JobTracker为了提高作业的吞吐量会限制作业任务的内存需求,所以在Job的提交时,JobTracker需要检查Job的内存需求是否满足JobTracker的设置。

上面流程已经完毕,可以总结为下图:

TaskTracker

TaskTracker会周期性地通过心跳机制将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死 任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。 “slot”代表计算资源(CPU、 内存等)。一个 Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Map slot和Reduce slot 两种,分别供Map Task和Reduce Task使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。

这里可能有人会混淆JobTracker、TaskTracker和Hadoop学习(一)——hdfs架构中所讲的的DataNode、NameNode。其实JobTracker对应于NameNode,TaskTracker对应于DataNode。DataNode和NameNode是针对数据存放来而言的,JobTracker和TaskTracker是对于MapReduce执行而言的。

MapTask和ReduceTask

Task 分为 Map Task 和 Reduce Task 两种,均由TaskTracker启动。从Hadoop学习(一)——hdfs架构中我们知道,HDFS以固定大小的block 为基本单位存储数据,而对于MapReduce的输入而言,其处理单位是split。 split 与 block 的对应关系默认是1:1。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了Map Task的数目,因为每个split会交由一个Map Task处理。split会在后面MapReduce的执行过程中详细讲。

MapReduce 1.x工作流程

www.aboutyun.com/thread-1549…

整个MapReduce作业的工作过程,如下所示:

  1. 作业的提交 JobClient的submitJob()方法实现的作业提交过程,如下所示: 通过JobTracker的getNewJobId()请求一个新的作业ID;(2) 检查作业的输出说明(比如没有指定输出目录或输出目录已经存在,就抛出异常); 计算作业的输入分片(当分片无法计算时,比如输入路径不存在等原因,就抛出异常); 将运行作业所需的资源(比如作业Jar文件,配置文件,计算所得的输入分片等)复制到一个以作业ID命名的目录中。(集群中有多个副本可供TaskTracker访问)(3) 通过调用JobTracker的submitJob()方法告知作业准备执行。(4)

  2. 作业的初始化 JobTracker接收到对其submitJob()方法的调用后,就会把这个调用放入一个内部队列中,交由作业调度器(比如先进先出调度器,容量调度器,公平调度器等)进行调度;(5) 初始化主要是创建一个表示正在运行作业的对象——封装任务和记录信息,以便跟踪任务的状态和进程;(5) 为了创建任务运行列表,作业调度器首先从HDFS中获取JobClient已计算好的输入分片信息(6)。然后为每个分片创建一个MapTask,并且创建ReduceTask。(Task在此时被指定ID,请区分清楚Job的ID和Task的ID)。

  3. 任务的分配 TaskTracker定期通过“心跳”与JobTracker进行通信,主要是告知JobTracker自身是否还存活,以及是否已经准备好运行新的任务等;(7) JobTracker在为TaskTracker选择任务之前,必须先通过作业调度器选定任务所在的作业; 对于MapTask和ReduceTask,TaskTracker有固定数量的任务槽(准确数量由TaskTracker核的数量和内存大小来决定)。JobTracker会先将TaskTracker的MapTask填满,然后分配ReduceTask到TaskTracker; 对于MapTrask,JobTracker通过会选取一个距离其输入分片文件最近的TaskTracker。对于ReduceTask,因为无法考虑数据的本地化,所以也没有什么标准来选择哪个TaskTracker。

  4. 任务的执行 TaskTracker分配到一个任务后,通过从HDFS把作业的Jar文件复制到TaskTracker所在的文件系统(Jar本地化用来启动JVM),同时TaskTracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘;(8) TaskTracker为任务新建一个本地工作目录,并把Jar文件中的内容解压到这个文件夹中; TaskTracker启动一个新的JVM(9)来运行每个Task(包括MapTask和ReduceTask),这样Client的MapReduce就不会影响TaskTracker守护进程(比如,导致崩溃或挂起等); 子进程通过umbilical接口与父进程进行通信,Task的子进程每隔几秒便告知父进程它的进度,直到任务完成。

  5. 进程和状态的更新 一个作业和它的每个任务都有一个状态信息,包括作业或任务的运行状态,Map和Reduce的进度,计数器值,状态消息或描述(可以由用户代码来设置)。这些状态信息在作业期间不断改变,它们是如何与Client通信的呢?

  • 任务在运行时,对其进度(即任务完成的百分比)保持追踪。对于MapTask,任务进度是已处理输入所占的比例。对于ReduceTask,情况稍微有点复杂,但系统仍然会估计已处理Reduce输入的比例;
  • 这些消息通过一定的时间间隔由Child JVM—>TaskTracker—>JobTracker汇聚。JobTracker将产生一个表明所有运行作业及其任务状态的全局视图。可以通过Web UI查看。同时JobClient通过每秒查询JobTracker来获得最新状态,并且输出到控制台上。
  1. 作业的完成 当JobTracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为"成功"。然后,在JobClient查询状态时,便知道作业已成功完成,于是JobClient打印一条消息告知用户,最后从runJob()方法返回。

MapReduce 1.x的缺点

随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:

  1. JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
  2. JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  3. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  4. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪。
  5. 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  6. 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

MapReduce 2.x/Yarn框架

从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。

YARN是Hadoop 2.0中的资源管理系统,它的基本设计思想是将MRv1中的JobTracker拆分成了两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配,而ApplicationMaster负责单个应用程序的管理。ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。

YARN是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。其中,ResourceManager负责所有资源的监控、分配和管理;ApplicationMaster负责每一个具体应用程序的调度和协调;NodeManager负责每一个节点的维护。对于所有的applications,RM拥有绝对的控制权和对资源的分配权。而每个AM则会和RM协商资源,同时和NodeManager通信来执行和监控task。 几个模块之间的关系如图所示。

MapReduce 2.x重点概念

ResourceManager(RM)

RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)应用程序管理器(Applications Manager,AsM)

  • 调度器

    调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。

    需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。

  • 应用程序管理器

    应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

NodeManager(NM)

NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。

ApplicationMaster(AM)

用户提交的应用程序均包含一个AM,负责为应用程序申请资源并分配给内部的任务,应用的监控,跟踪应用执行状态,重启失败任务等。ApplicationMaster是应用框架,它负责向ResourceManager协调资源,并且与NodeManager协同工作完成Task的执行和监控。MapReduce就是原生支持的一种框架,可以在YARN上运行Mapreduce作业。有很多分布式应用都开发了对应的应用程序框架,用于在YARN上运行任务,例如Spark,Storm等。如果需要,我们也可以自己写一个符合规范的YARN application。

Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。每个Container可以根据需要运行ApplicationMaster、Map、Reduce或者任意的程序。

YARN应用工作流程

  1. 用户向YARN中提交应用程序,其中包括AM程序、启动AM的命令、命令参数、用户程序等;事实上,需要准确描述运行ApplicationMaster的unix进程的所有信息。提交工作通常由YarnClient来完成。

  2. RM为该应用程序分配第一个Container,并与对应的NM通信,要求它在这个Container中启动AM;

  3. AM首先向RM注册,这样用户可以直接通过RM査看应用程序的运行状态,运行状态通过 AMRMClientAsync.CallbackHandler的getProgress() 方法来传递给RM。 然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4〜7;

  4. AM采用轮询的方式通过RPC协议向RM申请和领取资源;资源的协调通过 AMRMClientAsync异步完成,相应的处理方法封装在AMRMClientAsync.CallbackHandler中。

  5. —旦AM申请到资源后,便与对应的NM通信,要求它启动任务;通常需要指定一个ContainerLaunchContext,提供Container启动时需要的信息。

  6. NM为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务;

  7. 各个任务通过某个RPC协议向AM汇报自己的状态和进度,以让AM随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;ApplicationMaster与NM的通信通过NMClientAsync object来完成,容器的所有事件通过NMClientAsync.CallbackHandler来处理。例如启动、状态更新、停止等。

  8. 应用程序运行完成后,AM向RM注销并关闭自己。