批处理引擎MapReduce内部原理
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.MapReduce作业生命周期
MapReduce作业作为一种分布式应用程序,可直接运行在Hadoop资源管理系统YARN之上(MapReduce On YARN)。如下图所示,每个MapReduce应用程序由一个MRAppMaster以及一系列MapTask和ReduceTask构成,它们通过ResourceManager获得资源,并由NodeManager启动运行。
当用户向YARN中提交一个MapReduce应用程序后,YARN将分为两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MRAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行成功。如下图所示,YRAN的工作流程分为以下几个步骤: (1)用户向YARN集群提交应用程序,该应用程序包括以下配置信息:MRAppMaster所在的jar包,启动MRAppMaster的命令及其资源要求(CPU,内存等),用户配置jar包等。 (2)ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的MRAppMaster。 (3)MRAppMaster启动后,首先向ResourceManager注册(告之所在节点,端口号以及访问链接等),这样,用户可以直接通过ResourceManager查看应用程序的运行状态,直到所有任务运行结束,即重复步骤4~7。 (4)MRAppMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。 (5)一旦MRAppMaster申请到(部分)资源后,则通过一定的调度算法将资源分配给内部的任务,之后与对应的NodeManager通信,要求它启动这些任务。 (6)NodeManager为任务准备运行环境(包括环境变量,jar包,二进制程序等),并将任务执行命令写到一个shell脚本中,并通过运行该脚本启动任务。 (7)启动的Map Task或Reduce Task通过RPC协议向MRAppMaster汇报自己状态和进度,以让MRAppMaster随时掌握各种任务的运行状态,从而可以在任务失败时触发相应的容错机制。(在应用程序运行过程中,用户可随时通过RPC向MRAppMaster查询应用程序的当前运行状态) (8)应用程序运行完成后,MRAppMaster通过RPC向ResourceManager注销,并关闭自己。
ResourceManager,NodeManager,MRAppMaster以及MapTask/ReduceTask管理关系如下图所示。
ResourceManager为MRAppMaster分配资源,并告之NodeManager启动它,MRAppMaster启动后,会通过心跳信息与ResourceManager之间的联系;MRAppMaster负责为Map Task/Reduce Task申请资源,并通知ResourceManager启动它们,Map Task/Reduce Task启动后,会通过心跳维持与MRAppMaster之间的关系,基于以上设计机制,接下来介绍一下MapReduce On YARN架构的容错性。
(1)YARN
YARN本身具有高度容错性,具体容错机制的实现,可参考为后期分析的关于YARN的笔记。
(2)MRAppMaster
MRAppMaster由ResourceManager管理,一旦MRAppMaster因故障挂掉,ResourceManager会重新为他分配资源,并启动之。重启后的MRAppMaster需借助上次运行时记录的信息恢复状态,包括未运行,正在运行和已运行完成的任务。
(3)MapTask/ReduceTask
任务由MRAppMaster管理,一旦MapTask/ReduceTask因故障挂掉或因程序bug阻塞住,MRAppMaster会为之重新申请资源并启动之。
二.Map Task与Reduce Task
为了帮助读者深入了解Map Task和Reduce Task内部实现原理,我们将Map Task分解成Read,Map,Collect,Spill和Combine五个阶段,将Reduce Task分解成Shuffle,Merge,Sort,Reduce和Write五个阶段。 在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成。其中,每个Map Task处理输入数据集合中的一片数据(split),产生若干数据片段,经分组聚集和归约后,将结果写到HDFS中,整个过程如下图所示。
总体上看,Map Task越Reduce Task之间的数据传输采用了pull模型。为了提高容错性,Map Task端将中间计算结果存放到本地磁盘上,而Reduce Task则通过HTTP协议从各个Map Task端拉取(pull)相应的待处理数据。为了更好地支持大量Reduce Task并发从Map Task端拷贝数据,Hadoop采用了Netty(https://netty.io/)作为高性能网络传输服务。
对于Map Task而言,他的执行过程可概述为:首先,通过用户提供的InputFormat将对应的split解析成一系列<key,value>,并依次交给用户编写的map()函数处理;接着按照指定的Partitioner对数据分片,以确定每对<key,value>交给哪个Reduce Task处理;之后将数据交给用户定义的Combiner进行依次本地归约(用户没有定义则直接跳过);最后将处理结果保存在本地磁盘上。
对于Reduce Task而言,由于它的输入数据来自各个Map Task,因此首先通过HTTP从各个已经运行完成的Map Task上拷贝对应的数据分片;待数据拷贝完成后,在以key为关键字对所有数据进行排序,通过排序(注意,这个排序并不是对所有数据进行重新排序哟,而是针对key进行排序),key相同的记录会被聚集到一起形成分组;然后将每组数据依次交给用户编写的reduce()函数处理,并把处理结果直接写到HDFS上。
1>.Map Task详细流程
Map Task的整体计算流程如下图所示。共分为5个阶段,分别是: (1)Read阶段 Map Task通过InputFormat,从split中解析出一系列<key,value>。 (2)Map阶段 将解析出的<key,value>依次交给用户编写的map()函数处理,并产生一系列新的<key,value>。 (3)Collect阶段 在map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果,该函数内部,它将<key,value>划分成若干个数据分片(通过调用Partitioner),并写入一个环形内存缓冲区中。 (4)Spill阶段 即“溢写”,当环形缓冲区满后,MapReduce将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行依次本地排序,并在必要时对数据进行合并,压缩等操作。 (5)Combine阶段
当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
该问题实际上包含两层含义,即处理结果为何不写入内存,或者直接发送给Reduce Task? (1)首先,Map Task不能够将数据写入内存,因为一个集群中可能会同时运行多个作业,且每个作业可能分成多批运行Map Task,显然,将计算结果直接写入内存会耗光机器的内存; (2)其次,MapReduce采用的是动态调度策略,这意味着,一开始只有Map Task执行,而Reduce Task处于未调度状态,因此无法将Map Task计算结果直接发送给Reduce Task。 (3)将Map Task写入本地磁盘,使得Reduce Task执行失败时可直接从磁盘上再次读取各个Map Task的结果,而无需让所有Map Task重新执行。 总之,Map Task将处理结果写入本地磁盘主要目的是减少内存存储压力和容错。