TaskTracker任务初始化及启动task源码级分析 这篇文章中分析了任务的启动,每个task都会使用一个进程占用一个JVM来执行,org.apache.hadoop.mapred.Child方法是具体的JVM启动类,其main方法中的taskFinal.run(job, umbilical)会启动具体的Task。
Task分为两种类型:MapTask和ReduceTask,很明显,前者对应于Map任务,后者对应于Reduce任务。且MapTask分为4种:Job-setup Task、Job-cleanup Task、Task-cleanup Task和 Map Task。Job-setup Task、Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录;Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务;最后一种Map Task则是处理数据并将结果存到本地磁盘上。
本节先看MapTask,Child类调用run()方法,此类任务的run()方法代码如下:
1 @Override 2 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 3 throws IOException, ClassNotFoundException, InterruptedException { 4 //负责与TaskTracker的通信,通过该对象可以获得必要的对象 5 this.umbilical = umbilical; 6 7 // start thread that will handle communication with parent 8 // 启动Reporter线程,用来和TaskTracker交互目前运行的状态 9 TaskReporter reporter = new TaskReporter(getProgress(), umbilical, 10 jvmContext); 11 reporter.startCommunicationThread(); 12 boolean useNewApi = job.getUseNewMapper(); 13 /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创 建commiter,设置工作目录等*/ 14 initialize(job, getJobID(), reporter, useNewApi); 15 16 // check if it is a cleanupJobTask 17 /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/ 18 if (jobCleanup) { 19 runJobCleanupTask(umbilical, reporter); 20 return; 21 } 22 if (jobSetup) { 23 //主要是创建工作目录的FileSystem对象 24 runJobSetupTask(umbilical, reporter); 25 return; 26 } 27 if (taskCleanup) { 28 //设置任务目前所处的阶段为结束阶段,并且删除工作目录 29 runTaskCleanupTask(umbilical, reporter); 30 return; 31 } 32 //如果不是上述四种类型,则真正运行任务 33 if (useNewApi) { 34 runNewMapper(job, splitMetaInfo, umbilical, reporter); 35 } else { 36 runOldMapper(job, splitMetaInfo, umbilical, reporter); 37 } 38 done(umbilical, reporter);//等待JobTracker的commit命令 39 }
(1)参数TaskUmbilicalProtocol,这个协议用于Child和TaskTracker之间的通信。Child通过此协议,查看TaskTracker是否存在,取得任务,报告任务的进度,状态,出错信息,Commit文件到HDFS,并取得map结果给reduce;TaskTracker接收任务并监控任务的进度。
(2)TaskReporter类是是Task类的内部私有类。Task.TaskReporter用于向TaskTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,向TaskTracker报告Task执行情况。startCommunicationThread()方法会启动线程。
(3)useNewApi = job.getUseNewMapper()获取这个Task使用的新的API还是旧的API。mapreduce job提交流程源码级分析(一)(原创)这篇文章有讲在Job提交的时候就设置了使用新的API(包括新的Mapper和新的Reducer)。
(4)initialize(job, getJobID(), reporter, useNewApi)该方法在父类Task中。这个方法会将Task的状态设置为RUNNING,表示正在运行;然后如果是新API会获取对应的OutputFormatClass默认是TextOutputFormat.class,新API会获取mapreduce.FileOutputCommitter旧API会获取mapred.FileOutputCommitter;再获取在MapReduce程序中通过FileOutputFormat.setOutputPath设置的输出目录,如果这个输出目录不为null且是新的API会执行else语句FileOutputFormat.setWorkOutputPath(conf, outputPath)(这个是旧版mapred.FileOutputFormat)设置工作目录,比如hdfs://IP:8020/user/XXX,IP指的是namenode,XXX指的是用户名;然后构造一个资源计算器ResourceCalculatorPlugin对象,来获取内存、CPU等资源信息。
(5)如果jobCleanup==true(是在TaskInProgress类中设置的)表明这个task是清理Job的。直接运行runJobCleanupTask(umbilical, reporter)方法,这个方法是清理Job,包括步骤状态设置,更新状态到TaskTracker,调用org.apache.hadoop.mapreduce.OutputCommitter的相关方法,删除目录,通过done,通知TaskTracker任务完成等待commit命令。
(6)如果jobSetup==true(是在TaskInProgress类中设置的)表明要初始化Job,直接运行runJobSetupTask(umbilical, reporter)为建立Job做准备,执行状态设置,然后调用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通过done,通知TaskTracker任务完成等待commit命令。
(7)如果是taskCleanup==true(是在TaskInProgress类中设置的)表明是清理task的任务,直接运行runTaskCleanupTask(umbilical, reporter),清理Task任务,和上面(5)中runJobCleanupTask类似。
(8)接下来才是执行Mapper的步骤,如果不是上面的5,6,7三种,如果是启用新的API(实际上是启用的,我们也只分析新API),就执行runNewMapper(job, splitMetaInfo, umbilical, reporter)方法。
(9)done(umbilical, reporter)这个方法也被上面的5,6,7调用了,这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。
下面我们来看(8)中的runNewMapper(job, splitMetaInfo, umbilical, reporter)方法方法,这个方法将会构造一系列的对象来辅助执行Mapper。其代码如下:
1 private <INKEY,INVALUE,OUTKEY,OUTVALUE> 2 void runNewMapper(final JobConf job, 3 final TaskSplitIndex splitIndex, 4 final TaskUmbilicalProtocol umbilical, 5 TaskReporter reporter 6 ) throws IOException, ClassNotFoundException, 7 InterruptedException { 8 /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加了一些有关task的信息。 9 * 通过taskContext对象可以获得很多与任务执行相关的类,比如用户定义的Mapper类,InputFormat类等等 */ 10 // make a task context so we can get the classes 11 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 12 new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID()); 13 // make a mapper//创建用户自定义的Mapper类的实例 14 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = 15 (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) 16 ReflectionUtils.newInstance(taskContext.getMapperClass(), job); 17 // make the input format 创建用户指定的InputFormat类的实例 18 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = 19 (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) 20 ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); 21 // rebuild the input split 重新生成InputSplit 22 org.apache.hadoop.mapreduce.InputSplit split = null; 23 split = getSplitDetails(new Path(splitIndex.getSplitLocation()), 24 splitIndex.getStartOffset()); 25 //根据InputFormat对象创建RecordReader对象,默认是LineRecordReader 26 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = 27 new NewTrackingRecordReader<INKEY,INVALUE> 28 (split, inputFormat, reporter, job, taskContext); 29 30 job.setBoolean("mapred.skip.on", isSkipping()); 31 //生成RecordWriter对象 32 org.apache.hadoop.mapreduce.RecordWriter output = null; 33 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 34 mapperContext = null; 35 try { 36 Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor = 37 org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor 38 (new Class[]{org.apache.hadoop.mapreduce.Mapper.class, 39 Configuration.class, 40 org.apache.hadoop.mapreduce.TaskAttemptID.class, 41 org.apache.hadoop.mapreduce.RecordReader.class, 42 org.apache.hadoop.mapreduce.RecordWriter.class, 43 org.apache.hadoop.mapreduce.OutputCommitter.class, 44 org.apache.hadoop.mapreduce.StatusReporter.class, 45 org.apache.hadoop.mapreduce.InputSplit.class}); 46 47 // get an output object 48 if (job.getNumReduceTasks() == 0) { 49 output = 50 new NewDirectOutputCollector(taskContext, job, umbilical, reporter); 51 } else { 52 output = new NewOutputCollector(taskContext, job, umbilical, reporter); 53 } 54 55 mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(), 56 input, output, committer, 57 reporter, split); 58 /*初始化,在默认情况下调用的是LineRecordReader的initialize方 法,主要是打开输入文件并且将文件指针指向文件头*/ 59 input.initialize(split, mapperContext); 60 mapper.run(mapperContext); //Mapper的执行 61 input.close(); 62 output.close(mapperContext); 63 } catch (NoSuchMethodException e) { 64 throw new IOException("Can't find Context constructor", e); 65 } catch (InstantiationException e) { 66 throw new IOException("Can't create Context", e); 67 } catch (InvocationTargetException e) { 68 throw new IOException("Can't invoke Context constructor", e); 69 } catch (IllegalAccessException e) { 70 throw new IOException("Can't invoke Context constructor", e); 71 } 72 }