TaskTracker任务初始化及启动task源码级分析 这篇文章中分析了任务的启动,每个task都会使用一个进程占用一个JVM来执行,org.apache.hadoop.mapred.Child方法是具体的JVM启动类,其main方法中的taskFinal.run(job, umbilical)会启动具体的Task。

  Task分为两种类型:MapTask和ReduceTask,很明显,前者对应于Map任务,后者对应于Reduce任务。且MapTask分为4种:Job-setup Task、Job-cleanup Task、Task-cleanup Task和 Map Task。Job-setup Task、Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录;Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务;最后一种Map Task则是处理数据并将结果存到本地磁盘上。

  本节先看MapTask,Child类调用run()方法,此类任务的run()方法代码如下:  

 1  @Override
 2   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
 3     throws IOException, ClassNotFoundException, InterruptedException {
 4       //负责与TaskTracker的通信,通过该对象可以获得必要的对象 
 5     this.umbilical = umbilical;
 6 
 7     // start thread that will handle communication with parent
 8     // 启动Reporter线程,用来和TaskTracker交互目前运行的状态
 9     TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
10         jvmContext);
11     reporter.startCommunicationThread();
12     boolean useNewApi = job.getUseNewMapper();
13     /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创 建commiter,设置工作目录等*/ 
14     initialize(job, getJobID(), reporter, useNewApi);
15 
16     // check if it is a cleanupJobTask
17     /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/ 
18     if (jobCleanup) {
19       runJobCleanupTask(umbilical, reporter);
20       return;
21     }
22     if (jobSetup) {
23         //主要是创建工作目录的FileSystem对象 
24       runJobSetupTask(umbilical, reporter);
25       return;
26     }
27     if (taskCleanup) {
28         //设置任务目前所处的阶段为结束阶段,并且删除工作目录 
29       runTaskCleanupTask(umbilical, reporter);
30       return;
31     }
32     //如果不是上述四种类型,则真正运行任务
33     if (useNewApi) {
34       runNewMapper(job, splitMetaInfo, umbilical, reporter);
35     } else {
36       runOldMapper(job, splitMetaInfo, umbilical, reporter);
37     }
38     done(umbilical, reporter);//等待JobTracker的commit命令
39   }

  (1)参数TaskUmbilicalProtocol,这个协议用于Child和TaskTracker之间的通信。Child通过此协议,查看TaskTracker是否存在,取得任务,报告任务的进度,状态,出错信息,Commit文件到HDFS,并取得map结果给reduce;TaskTracker接收任务并监控任务的进度。

  (2)TaskReporter类是是Task类的内部私有类。Task.TaskReporter用于向TaskTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,向TaskTracker报告Task执行情况。startCommunicationThread()方法会启动线程。

  (3)useNewApi = job.getUseNewMapper()获取这个Task使用的新的API还是旧的API。mapreduce job提交流程源码级分析(一)(原创)这篇文章有讲在Job提交的时候就设置了使用新的API(包括新的Mapper和新的Reducer)。

  (4)initialize(job, getJobID(), reporter, useNewApi)该方法在父类Task中。这个方法会将Task的状态设置为RUNNING,表示正在运行;然后如果是新API会获取对应的OutputFormatClass默认是TextOutputFormat.class,新API会获取mapreduce.FileOutputCommitter旧API会获取mapred.FileOutputCommitter;再获取在MapReduce程序中通过FileOutputFormat.setOutputPath设置的输出目录,如果这个输出目录不为null且是新的API会执行else语句FileOutputFormat.setWorkOutputPath(conf, outputPath)(这个是旧版mapred.FileOutputFormat)设置工作目录,比如hdfs://IP:8020/user/XXX,IP指的是namenode,XXX指的是用户名;然后构造一个资源计算器ResourceCalculatorPlugin对象,来获取内存、CPU等资源信息。

  (5)如果jobCleanup==true(是在TaskInProgress类中设置的)表明这个task是清理Job的。直接运行runJobCleanupTask(umbilical, reporter)方法,这个方法是清理Job,包括步骤状态设置,更新状态到TaskTracker,调用org.apache.hadoop.mapreduce.OutputCommitter的相关方法,删除目录,通过done,通知TaskTracker任务完成等待commit命令。

  (6)如果jobSetup==true(是在TaskInProgress类中设置的)表明要初始化Job,直接运行runJobSetupTask(umbilical, reporter)为建立Job做准备,执行状态设置,然后调用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通过done,通知TaskTracker任务完成等待commit命令。

  (7)如果是taskCleanup==true(是在TaskInProgress类中设置的)表明是清理task的任务,直接运行runTaskCleanupTask(umbilical, reporter),清理Task任务,和上面(5)中runJobCleanupTask类似。

  (8)接下来才是执行Mapper的步骤,如果不是上面的5,6,7三种,如果是启用新的API(实际上是启用的,我们也只分析新API),就执行runNewMapper(job, splitMetaInfo, umbilical, reporter)方法。

  (9)done(umbilical, reporter)这个方法也被上面的5,6,7调用了,这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。

  下面我们来看(8)中的runNewMapper(job, splitMetaInfo, umbilical, reporter)方法方法,这个方法将会构造一系列的对象来辅助执行Mapper。其代码如下:

 1 private <INKEY,INVALUE,OUTKEY,OUTVALUE>
 2   void runNewMapper(final JobConf job,
 3                     final TaskSplitIndex splitIndex,
 4                     final TaskUmbilicalProtocol umbilical,
 5                     TaskReporter reporter
 6                     ) throws IOException, ClassNotFoundException,
 7                              InterruptedException {
 8       /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加了一些有关task的信息。
 9        * 通过taskContext对象可以获得很多与任务执行相关的类,比如用户定义的Mapper类,InputFormat类等等 */ 
10     // make a task context so we can get the classes
11     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
12       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
13     // make a mapper//创建用户自定义的Mapper类的实例
14     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
15       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
16         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
17     // make the input format 创建用户指定的InputFormat类的实例 
18     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
19       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
20         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
21     // rebuild the input split  重新生成InputSplit 
22     org.apache.hadoop.mapreduce.InputSplit split = null;
23     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
24         splitIndex.getStartOffset());
25   //根据InputFormat对象创建RecordReader对象,默认是LineRecordReader 
26     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
27       new NewTrackingRecordReader<INKEY,INVALUE>
28           (split, inputFormat, reporter, job, taskContext);
29 
30     job.setBoolean("mapred.skip.on", isSkipping());
31   //生成RecordWriter对象
32     org.apache.hadoop.mapreduce.RecordWriter output = null;
33     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
34          mapperContext = null;
35     try {
36       Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
37         org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
38         (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
39                      Configuration.class,
40                      org.apache.hadoop.mapreduce.TaskAttemptID.class,
41                      org.apache.hadoop.mapreduce.RecordReader.class,
42                      org.apache.hadoop.mapreduce.RecordWriter.class,
43                      org.apache.hadoop.mapreduce.OutputCommitter.class,
44                      org.apache.hadoop.mapreduce.StatusReporter.class,
45                      org.apache.hadoop.mapreduce.InputSplit.class});
46 
47       // get an output object
48       if (job.getNumReduceTasks() == 0) {
49          output =
50            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
51       } else {
52         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
53       }
54 
55       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
56                                                      input, output, committer,
57                                                      reporter, split);
58       /*初始化,在默认情况下调用的是LineRecordReader的initialize方 法,主要是打开输入文件并且将文件指针指向文件头*/ 
59       input.initialize(split, mapperContext);
60       mapper.run(mapperContext);    //Mapper的执行
61       input.close();
62       output.close(mapperContext);
63     } catch (NoSuchMethodException e) {
64       throw new IOException("Can't find Context constructor", e);
65     } catch (InstantiationException e) {
66       throw new IOException("Can't create Context", e);
67     } catch (InvocationTargetException e) {
68       throw new IOException("Can't invoke Context constructor", e);
69     } catch (IllegalAccessException e) {
70       throw new IOException("Can't invoke Context constructor", e);
71     }
72   }
View Code

相关文章:

  • 2021-09-11
  • 2021-12-17
  • 2021-07-01
  • 2022-01-15
  • 2022-02-02
  • 2021-05-26
  • 2022-12-23
  • 2021-10-31
猜你喜欢
  • 2022-12-23
  • 2021-06-26
  • 2021-10-17
  • 2022-12-23
  • 2021-07-13
  • 2021-08-17
相关资源
相似解决方案