Hadoop-Yarn学习
0x01 基本概念
Yarn全名Yet Another Resource Negotiator,即资源协调/管理者,在Hadoop2中引入。
1.1 Yarn是什么
Yarn,英文全名是 Yet Another Resource Negotiator,是由雅虎开发的第二代集群资源调度器。查看论文点这里。Yarn在大数据体系中的示意图如下:
而应用层在Application层之上,如Hive等。
和第一代不同,Yarn对各个角色进行了重新抽象。Yarn把JobTracker划分为了管理集群资源的ResourceManager(以下简称RM)和管理集群上运行任务的生命周期的AppMaster(以下简称AM)。此外,还有一个负责管理上报所在节点资源、响应处理AM的任务启停请求的角色NodeManager(以下简称NM)。
- 基本的思路
AM向RM申请资源(Container),RM调度分配Container后,App拆分后的task在container上运行。NM监控该节点的Container,确保App使用的资源不会超过配额。
1.2 Yarn的架构
如上图所示,Yarn的架构里包括以下角色:
1.2.1 Client客户端
负责向Yarn提交App作业
1.2.2 Resource Manager-资源管理器
每个Yarn集群一个RM(还可以设一个standBy节点做HA),负责整个集群的计算资源的管理和分配,RM主要由以下两部分组成
- 调度器(Scheduler)
调度器根据容量、队列等限制条件,将系统中的资源(container)分配给各个正在运行的App;不负责具体应用程序相关的工作,比如监控或跟踪状态(AM负责);不负责重新启动失败任务(AM负责)。调度器是一个可拔插的组件,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等,用户还可以编写符合规范的自定义调度器。 - 应用程序管理器(ASM)
管理提交到RM的所有App。体现到代码里主要是org.apache.hadoop.yarn.server.resourcemanager.RMAppManager
1.2.3 Node Manager-节点管理器
每个Yarn集群中有多个NM,每个NM有多个Container用于Container的启动和监测(每个NM上可能有多个Container)
注:根据Yarn配置的不同,Container可能是一个Unix进程或者一个Linux cgroup实例,在受限的资源范围内(如内存、CPU等)执行特定应用程序的代码。
NM主要功能如下:
- 启动和监视节点上的计算容器(Container)
- 以心跳的形式向RM汇报本节点上的资源使用情况和各个Container的运行状态
- 接收并处理来自AM的Container启动/停止等各种请求
1.2.4 App Master-应用程序管理器
协调、管理App的所有task。AM和task都运行在container中, container资源由RM调度分配, 由NM管理
AM 主要功能如下:
- 为App向RM申请所需Container资源
- 将得到的资源进一步分配给内部的任务
- 与NM通信以启动/停止Container容器(请求带有这些信息:资源配额、安全性令牌(如果已启用)、启动Container的命令、进程环境、必要的二进制文件/ jar 等)、获取Container状态信息
- 监控App作业的所有task运行状态,并在task运行失败时重新为其申请资源以重跑
1.2.5 Container-资源的抽象
Container是资源的抽象,他表示资源分配单位(CPU,内存等)。Container是一个动态资源分配单位,可以是UNIX进程或Linux cgroup,具体决定于Yarn配置(),它内部封装了内存、CPU、磁盘、网络等资源,用cgroup等方法限定每个task的资源量。Container由RM调度, 由NM管理。
注意:Container不同于MRv1中的slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。
0x02 Yarn作业原理
2.1 概述
- Client提交一个应用到RM,申请第一个Container来运行AM
- RM找到合适的NM,NM在Container容器内启动AM
- AM将资源申请的请求放在随NM发往RM的心跳中
- RM通知NM启动Container来运行应用程序
注意,需要注意的是,用户应用程序间各个组件的通信必须自己实现,Yarn没有提供通信工具。
2.2 作业本地性要求
在AM为tasks向RM申请Container时,该请求包括每个map task的数据本地化信息,特别是输入split所在的节点和机架(rack)信息。调度器会使用这些信息来进行调度决策。调度器会优先将task分配到所需数据所在节点执行,这就是所谓数据本地化策略,避免了远程访问数据的各种开销。
如果不能分配,那就会根据用户设置的RelaxLocality(本地化松弛,默认为true)来将task分配到本地化节点同机架的其他节点上。
比如申请处理HDFS数据的Container时,先考虑拥有该数据block副本的节点,如果没有再考虑这些副本所在节点同机架的其他节点,如果还是没有只能申请任意节点。
2.3 申请资源方式
- 在最开始就申请所有需要的资源
典型的就是Spark,申请固定的vcores, 内存,启动固定数量的Exector - 根据变化的需求来动态申请资源
典型的是MapReduce,最开始申请map task容器,此时还没有申请reduce task容器。 - 失败恢复
当有task失败时,会申请额外的容器来重跑失败的task。
2.4 MR On Yarn
2.4.1 MR作业运行流程
先说明下,每次提交Application的作业就是job,一般包括多个任务(task)。一个MapReduce job 包括多个map task和 reduce task。
我们这里引用《Hadoop权威指南》中一张十分经典的的MR Job运行在Yarn中的流程图:
下面说下详细步骤:
-
客户端进程启动job
-
向RM发出请求,获取一个代表此job的全局唯一appID
-
Client 检查job的输出说明,计算输入分片数,并将job资源(包括job的jar、Configuration、分片信息等)复制到HDFS内,以便执行task时拉取。
-
Client向ResourceManager提交job。此提交请求中包含一个ApplicationSubmissionContext,他包括了RM为该App启动AM的所有信息:
- App ID
- App User
- App Name
- App 优先级
- ContainerLaunchContext,他包括NM启动Container所需信息,包括 containerId、Container索要的资源量、user、安全token、启动container的本地资源依赖(如binary jar等)、运行环境变量、运行命令等
- maxAppAttempts AM尝试的最大重试次数
- attemptFailuresValidityInterval 故障计数时间间隔
-
该Job的AppMaster启动,分为两个步骤:
5a. RM首先为AppMaster在某个NM上分配、启动一个Container
5b. NM收到RM命令, 使用分配的Container来启动AppMaster -
AM的主类MRAppMaster做一些Job的初始化工作,如监控作业进度等。
-
通过HDFS得到由客户端计算好的输入split,然后为每个输入split创建一个map task, 再根据
mapreduce.job.reduces创建指定数量的reducer task.
然后AM决定如何运行构成整个job的tasks。如果job很小, AM根据用户配置可以选择在本节点的JVM中运行该job, 这种job称作是uber job。 -
AM为tasks向RM申请Container
如果该job不是uber类型,那么AM机会向RM请求container来运行所有的map和reduce任务。 (注:每个任务对应一个container,且只能在该container上运行)。该请求包括每个map task的数据本地化信息,特别是输入split所在的节点和机架(rack)信息。调度器会使用这些信息来进行调度决策。调度器会优先将task分配到所需数据所在节点执行,这就是所谓数据本地化策略,避免了远程访问数据的各种开销。如果不能分配,那就会根据用户设置的RelaxLocality(本地化松弛,默认为true)来将task分配到本地化节点同机架的其他节点上。
请求还包括了task的内存需求, 默认情况下map和reduce任务的内存需求都是1024MB。 可以通过
mapreduce.map.memory.mb和mapreduce.reduce.memory.mb配置。Hadoop2中分配内存的方式和Hadoop1中不一样, Hadoop1中每个tasktracker有固定数量的slot, slot是在集群配置是设置的, 每个任务运行在一个slot中, 每个slot都有最大内存限制, 这也是整个集群固定配置的。这种方式很不灵活。可能的问题如小内存需求任务占用slot导致内存利用率低或是大内存需求任务占有slot但无足够内存导致失败。
而在YARN中, 资源划分的粒度更细。App的内存需求可以介于最小内存和最大内存之间, 但必须是最小内存的整数倍,分配更灵活。 -
Container资源成功给一个task后,分两个步骤来启动该task:
a. AM向NM发送启动指定container的RPC请求StartContainersRequest;
b. NM通过收到的请求上下文中的信息带参数执行YarnChild类的main方法。 -
资源本地化-去HDFS拉取task所需资源
在运行task之前,先去HDFS拉取task需要的所有文件到本地, 比如作业配置、JAR文件等所有来自HDFS的文件(具体需求在第九步提交的StartContainersRequest中),这一步称为数据资源本地化。 -
启动Map或者ReduceTask
注意:YarnChild运行在一个专用的JVM中, 但是YARN默认不支持JVM重用,因此每一个任务都是运行在一个新的JVM中。但是可以用uber job配置
-
应用程序运行完成后,AM向RM注销并关闭自己。
2.4.2 MRJob进度和状态更新
YARN中的task将其执行进度和状态上报给AM(使用TaskUmbilicalProtocol协议), AM每隔三秒将所有task上报的状态信息合并为该Job的整体状态视图。
如果开启了verbose,Client每秒(默认值为1秒,可以通过mapreduce.client.progressmonitor.pollinterval设置)向AM请求查询任务实时的执行情况,方便用户查看。
在YARN中, 可以通过UI看到所有的App运行情况。具体来说,RM的WebUI 展示运行中的App以及对应的AM。AM展示管理的tasks进度等细节信息。
2.4.3 MRJob的完成
在Job生命周期内,Client可从AM查询Job进度, 还会每5秒(未开启verbose时的默认值,可通过mapreduce.client.completion.pollinterval设置)检查是否完成。
Job完成之后, AM和container会清理自己的工作状态, OutputCommiter的作业清理方法也会被调用。Job的信息会被Job历史服务器存储以备之后用户核查。
2.4.4 MRJob失败处理
Yarn的失败包括task失败、AM失败、NM失败甚至是RM失败,下面详细介绍下这几种情况。
2.4.4.1 task运行失败
task程序运行时异常和JVM进程突然退出会上报到AM,此次task尝试失败。
还有一种情况是task挂起。task运行过程中会定期调用TaskUmbilicalProtocol协议中ping方法联系AM,如果程序挂起导致AM一直收不到ping(通过mapreduce.task.timeout设置),就会判定该task超时,AM将该次task尝试标记为失败。如果将上述配置设为0 ,代表task不会被标记为失败,导致其资源无法释放,可能会出大问题,不推荐。
task尝试失败后,AM尝试重新执行该任务,同时要避免在失败节点上重跑。重试次数的配置如下:
map task:mapreduce.map.maxattempts,
reduce task:mapreduce.reduce.maxattempts。
task尝试失败次数超过这个阈值,就会认为这个task失败,不再进行重试。
最后说下怎么判定整个job的失败。如果一个MapReduce job中超过mapreduce.map.failures.maxpercent的map task 或者mapreduce.reduce.failures.maxpercent reduce task运行失败,就判定该job失败。
2.4.4.2 AM失败
AM会定时向RM发送心跳。如果AM故障(如硬件故障或网络错误等),RM可感知到并在新的container中运行一个新的AM实例。在默认情况下,只要AM运行失败一次,然后重试一次又失败就被认为是AM失败了不再重试。可以通过mapreduce.am.max-attempts来设置该值。
Yarn有一个全局参数yarn.resourcemanager.am.max-attempts,默认值为2,可以控制所在yarn上运行的AM的重试次数,也就是说上面讲的每个任务的重试次数不能超过这个全局参数。
MapReduce的AM恢复后,可以通过job历史来恢复故障App所运行tasks的状态,使其不需要重跑。可以通过设置yarn.app.mapreduce.am.job.recovery.enable为false,关闭此功能。
Client会向AM轮询job进度报告,当AM运行失败时,客户端需要重新定位新的AM实例。在job初始化的时候,client就通过请求RM得到并缓存了AM地址,这样做的好处是Client每次向AM查询时变得更快。在AM运行失败时,Client会因为请求超时而重新向RM请求最新的AM地址,这个过程对用户完全透明。
2.4.4.3 NM失败
NM也会定期向RM发送心跳。如果RM在10分钟内(默认值,可通过yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置)发现未收到任何来自某个NM节点的心跳,RM会将该节点从节点池中移除并通知该节点。
在该失败的NM上运行的所有App或AM就按照上文所述机制恢复。
注意:由于map task输出结果驻留在失败NM上的原因,所以在这些节点上运行成功的map task(所属job还未完成)还是需要重新调度运行,以免造成NM节点被踢掉后reduce任务无法拉取所需数据
最后要注意的是,如果累积的App运行失败次数过多,那么所在NM节点可能会被AM放入自己管理的黑名单(不管NM是否失败过)。对于MapReduce任务,默认累积超过3个task失败就会将此NM拉黑,任务会尽量分配到其他节点。该阈值的配置是mapreduce.job.maxtaskfailures.per.tracker。
注意:当前版本hadoop中,黑名单由应用程序的AM管理,对RM透明,也就是说就算老的App把某个NM节点拉黑,但是新的App的任务依然有可能分配到这些故障节点
2.4.4.4 RM失败
RM作为Yarn的大脑,如果失败后果十分严重,如果没有配置StandBy RM的话会存在单点故障风险,会导致所有运行的job全部失败且无法恢复。可以用共享存储解决这个问题,源码中该接口为RMStateStore,利用ZooKeeper(ZKRMStateStore)或HDFS(FileSystemRMStateStore)做了RM HA那么active RM挂掉的情况下会自动切换到standBy RM,Client无明显感知。
RM HA时可以把运行中的App(不包括AM管理的任务信息)信息存储在高可用的Zookeeper或HDFS中备份,以备failover时备节点恢复关键状态信息。NM的信息未保存,因为是心跳调度机制,当备RM节点恢复服务收到NM心跳时可以快速重构NM状态信息。
RM备节点接管后,会从上面的状态存储区中读取信息进行恢复,为所有App重启AM。
上述RM HA过程是由FailoverController自动处理,他在默认情况下运行在RM内,使用zookeeper的leader选举机制来确保同一时刻只有一个主RM。
2.5 UberJob
2.5.1 JVM重用
首先,简单回顾一下Hadoop 1.x中的JVM重用功能:用户可以通过更改配置,来指定TaskTracker在同一个JVM里面最多可以累积执行的Task的数量(默认是1)。这样的好处是减少JVM启动、退出的次数,从而达到提高任务执行效率的目的。 配置的方法也很简单:通过设置mapred-site.xml里面参数mapred.job.reuse.jvm.num.tasks的值。该值默认是1,意味着TaskTracker会为每一个Map任务或Reduce任务都启动一个JVM,当任务执行完后再退出该JVM。依次类推,如果该值设置为3,TaskTracker则会在同一个JVM里面最多依次执行3个Task,然后才会退出该JVM。
在 Yarn(Hadoop MapReduce v2)里面,不再有参数mapred.job.reuse.jvm.num.tasks,但它也有类似JVM Reuse的功能——uber。据Arun的说法,启用该功能能够让一些任务的执行效率提高2到3倍(“we’ve observed 2x-3x speedup for some jobs”)。不过,由于Yarn的结构已经大不同于MapReduce v1中JobTracker/TaskTracker的结构,因此uber的原理和配置都和之前的JVM重用机制大不相同。
2.5.2 uber的原理
Yarn的默认配置会禁用uber组件,即不允许JVM重用。我们先看看在这种情况下,Yarn是如何执行一个MapReduce job的。首先,Resource Manager里的Application Manager会为每一个application(比如一个用户提交的MapReduce Job)在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。container在Yarn中是分配资源的容器(内存、cpu、硬盘等),它启动时便会相应启动一个JVM。此时,Application Master便陆续为application包含的每一个task(一个Map task或Reduce task)向Resource Manager申请一个container。等每得到一个container后,便要求该container所属的NodeManager将此container启动,然后就在这个container里面执行相应的task。等这个task执行完后,这个container便会被NodeManager收回,而container所拥有的JVM也相应地被退出。在这种情况下,可以看出每一个JVM仅会执行一Task, JVM并未被重用。
用户可以通过启用uber组件来允许JVM重用——即在同一个container里面依次执行多个task。在yarn-site.xml文件中,改变一下几个参数的配置即可启用uber的方法:
| 参数 | 默认值 | 描述 |
|---|---|---|
| mapreduce.job.ubertask.enable | (false) | 是否启用user功能。如果启用了该功能,则会将一个“小的application”的所有子task在同一个JVM里面执行,达到JVM重用的目的。这个JVM便是负责该application的ApplicationMaster所用的JVM(运行在其container里)。那具体什么样的application算是“小的application"呢?下面几个参数便是用来定义何谓一个“小的application" |
| mapreduce.job.ubertask.maxmaps | 9 | map任务数的阀值,如果一个application包含的map数小于该值的定义,那么该application就会被认为是一个小的application |
| mapreduce.job.ubertask.maxreduces | 1 | reduce任务数的阀值,如果一个application包含的reduce数小于该值的定义,那么该application就会被认为是一个小的application。不过目前Yarn不支持该值大于1的情况“CURRENTLY THE CODE CANNOT SUPPORT MORE THAN ONE REDUCE” |
| mapreduce.job.ubertask.maxbytes | application的输入大小的阀值。默认为dfs.block.size的值。当实际的输入大小部超过该值的设定,便会认为该application为一个小的application。 |
最后,我们来看当uber功能被启用的时候,Yarn是如何执行一个application的:
- Resource Manager里的Application Manager会为每一个application在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。
- containe启动时便会相应启动一个JVM。
- 此时,如果uber功能被启用,并且该application被认为是一个“小的application”,那么Application Master便会将该application包含的每一个task依次在这个container里的JVM里顺序执行,直到所有task被执行完(“WIth ‘uber’ mode enabled, you’ll run everything within the container of the AM itself”)。这样Application Master便不用再为每一个task向Resource Manager去申请一个单独的container,最终达到了 JVM重用(资源重用)的目的。
0x03 Yarn调度
0x04 Yarn调优
0xFF 参考文档
- Hadoop 2.6.0-cdh5.8.2 源码
- Hadoop权威指南