MapReduce的MapTask任务的运行源码级分析 这篇文章好不容易恢复了。。。谢天谢地。。。这篇文章讲了MapTask的执行流程。咱们这一节讲解ReduceTask的执行流程。ReduceTask也有四种任务,可参考前一章节对应的内容,至于Reduce Task要从各个Map Task上读取一片数据,经过排序后,以组为单位交给用户编写的reduce方法,并将结果写入HDFS中。

  MapTask和ReduceTask都是Task的子类,分别对应于我们常说的map和reduce任务。同上一节一样Child类中直接运行的是run方法,ReduceTask.run()方法代码如下:

  1  //ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),
  2   //runJobSetupTask(),runTaskCleanupTask()。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。
  3   @Override
  4   @SuppressWarnings("unchecked")
  5   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  6     throws IOException, InterruptedException, ClassNotFoundException {
  7     this.umbilical = umbilical;
  8     job.setBoolean("mapred.skip.on", isSkipping());
  9     /*添加reduce过程需要经过的几个阶段。以便通知TaskTracker目前运   行的情况*/ 
 10     if (isMapOrReduce()) {
 11       copyPhase = getProgress().addPhase("copy");
 12       sortPhase  = getProgress().addPhase("sort");
 13       reducePhase = getProgress().addPhase("reduce");
 14     }
 15     // start thread that will handle communication with parent
 16  // 设置并启动reporter进程以便和TaskTracker进行交流 
 17     TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
 18         jvmContext);
 19     reporter.startCommunicationThread();
 20     //在job client中初始化job时,默认就是用新的API,详见Job.setUseNewAPI()方法
 21     boolean useNewApi = job.getUseNewReducer();        
 22     /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创建commiter,设置工作目录等*/ 
 23     initialize(job, getJobID(), reporter, useNewApi);//这里将会处理输出目录
 24     /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/ 
 25     // check if it is a cleanupJobTask
 26     if (jobCleanup) {
 27       runJobCleanupTask(umbilical, reporter);
 28       return;
 29     }
 30     if (jobSetup) {
 31         //主要是创建工作目录的FileSystem对象
 32       runJobSetupTask(umbilical, reporter);
 33       return;
 34     }
 35     if (taskCleanup) {
 36          //设置任务目前所处的阶段为结束阶段,并且删除工作目录 
 37       runTaskCleanupTask(umbilical, reporter);
 38       return;
 39     }
 40     
 41     // Initialize the codec
 42     codec = initCodec();
 43 
 44     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));  //判断是否是单机hadoop
 45     if (!isLocal) {
 46         //1. Copy.就是从执行各个Map任务的服务器那里,收到map的输出文件。拷贝的任务,是由ReduceTask.ReduceCopier 类来负责。
 47         //ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器 
 48       reduceCopier = new ReduceCopier(umbilical, job, reporter);
 49       if (!reduceCopier.fetchOutputs()) {////fetchOutputs函数负责拷贝各个Map函数的输出 
 50         if(reduceCopier.mergeThrowable instanceof FSError) {
 51           throw (FSError)reduceCopier.mergeThrowable;
 52         }
 53         throw new IOException("Task: " + getTaskID() + 
 54             " - The reduce copier failed", reduceCopier.mergeThrowable);
 55       }
 56     }
 57     copyPhase.complete();                         // copy is already complete
 58     setPhase(TaskStatus.Phase.SORT);
 59     statusUpdate(umbilical);
 60 
 61     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
 62     //2.Sort(其实相当于合并).排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行。
 63     //使用工具类Merger归并所有的文件。经过这一个流程,一个合并了所有所需Map任务输出文件的新文件产生了。
 64     //而那些从其他各个服务器网罗过来的 Map任务输出文件,全部删除了。
 65     
 66     //根据hadoop是否分布式来决定调用哪种排序方式 
 67     RawKeyValueIterator rIter = isLocal
 68       ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
 69           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
 70           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
 71           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
 72           reporter, spilledRecordsCounter, null)
 73       : reduceCopier.createKVIterator(job, rfs, reporter);
 74         
 75     // free up the data structures
 76     mapOutputFilesOnDisk.clear();
 77     
 78     sortPhase.complete();                         // sort is complete
 79     setPhase(TaskStatus.Phase.REDUCE); 
 80     statusUpdate(umbilical);
 81     //3.Reduce 1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"), 
 82     //valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")
 83     //和 Comparator (“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)
 84     Class keyClass = job.getMapOutputKeyClass();
 85     Class valueClass = job.getMapOutputValueClass();
 86     RawComparator comparator = job.getOutputValueGroupingComparator();
 87     //2.根据参数useNewAPI判断执行runNewReduce还是runOldReduce。分析润runNewReduce
 88     if (useNewApi) {
 89         //3.runNewReducer
 90         //0.像报告进程书写一些信息
 91         //1.获得一个TaskAttemptContext对象。通过这个对象创建reduce、output及用于跟踪的统计output的RecordWrit、最后创建用于收集reduce结果的Context
 92         //2.reducer.run(reducerContext)开始执行reduce
 93       runNewReducer(job, umbilical, reporter, rIter, comparator, 
 94                     keyClass, valueClass);
 95     } else {
 96       runOldReducer(job, umbilical, reporter, rIter, comparator, 
 97                     keyClass, valueClass);
 98     }
 99     done(umbilical, reporter);
100   }

  (1)reduce分为三个阶段(copy就是远程拷贝Map的输出数据、sort就是对所有的数据做排序、reduce做聚集就是我们自己写的reducer),为这三个阶段分别设置Progress,用来和TaskTracker通信报道状态。

  (2)上面代码的15-40行和MapReduce的MapTask任务的运行源码级分析 中对应部分基本相同,可参考之;

  (3)codec = initCodec()这句是检查map的输出是否是压缩的,压缩的则返回压缩codec实例,否则返回null,这里讨论不压缩的;

  (4)我们讨论完全分布式的hadoop,即isLocal==false,然后构造一个ReduceCopier对象reduceCopier,并调用reduceCopier.fetchOutputs()方法拷贝各个Mapper的输出,到本地;

  (5)然后copy阶段完成,设置接下来的阶段是sort阶段,更新状态信息;

  (6)根据isLocal来选择KV迭代器,完全分布式的会使用reduceCopier.createKVIterator(job, rfs, reporter)作为KV迭代器;

  (7)sort阶段完成,设置接下来的阶段是reduce阶段,更新状态信息;

  (8)然后获取一些配置信息,并根据是否使用新API选择不同的处理方式,这里是新的API,调用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)会执行reducer;

  (9)done(umbilical, reporter)这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。

  有些人将Reduce Task分为了5个阶段:一、shuffle阶段:也称为Copy阶段,就是从各个MapTask上远程拷贝一片数据,如果大小超过一定阈值就写到磁盘,否则放入内存;二、Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多;三、sort阶段:用户编写的reduce方法的输入数据是按key进行聚集的,需要对copy过来的数据排序,这里用的是归并排序,因为Map Task的结果是有序的;四、Reduce阶段:将每组数据依次交给用户编写的Reduce方法处理;五、write阶段:就是将结果写入HDFS。

  上面的5个阶段分的比较细了,代码里分为3个阶段copy、sort、reduce,我们在eclipse运行MR程序时,控制台看到的reduce阶段的百分比就分为3个阶段各占33.3%。

  接下来重点将两个个地方:runNewReducer方法和ReduceCopier类,后者有2000多行代码,占据了ReduceTask类的绝大部分代码量。

  A、我们先看runNewReducer吧,这个比ReduceCopier更容易一些,代码如下:

 1 @SuppressWarnings("unchecked")
 2   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
 3   void runNewReducer(JobConf job,
 4                      final TaskUmbilicalProtocol umbilical,
 5                      final TaskReporter reporter,
 6                      RawKeyValueIterator rIter,
 7                      RawComparator<INKEY> comparator,
 8                      Class<INKEY> keyClass,
 9                      Class<INVALUE> valueClass
10                      ) throws IOException,InterruptedException, 
11                               ClassNotFoundException {
12     // wrap value iterator to report progress.
13     final RawKeyValueIterator rawIter = rIter;
14     rIter = new RawKeyValueIterator() {
15       public void close() throws IOException {
16         rawIter.close();
17       }
18       public DataInputBuffer getKey() throws IOException {
19         return rawIter.getKey();
20       }
21       public Progress getProgress() {
22         return rawIter.getProgress();
23       }
24       public DataInputBuffer getValue() throws IOException {
25         return rawIter.getValue();
26       }
27       public boolean next() throws IOException {
28         boolean ret = rawIter.next();
29         reducePhase.set(rawIter.getProgress().get());
30         reporter.progress();
31         return ret;
32       }
33     };
34     // make a task context so we can get the classes
35     /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加了一些有关task的信息。通过taskContext对象可以获得很多与任务执行相
36     关的类,比如用户定义的Mapper类,InputFormat类等等 */ 
37     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
38       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
39     // make a reducer
40     //创建用户定义的Reduce类的实例 
41     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
42       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
43         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
44 
45      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
46        new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(reduceOutputCounter,
47          job, reporter, taskContext);
48     job.setBoolean("mapred.skip.on", isSkipping());
49     org.apache.hadoop.mapreduce.Reducer.Context 
50          reducerContext = createReduceContext(reducer, job, getTaskID(),
51                                                rIter, reduceInputKeyCounter,
52                                                reduceInputValueCounter, 
53                                                trackedRW, committer,
54                                                reporter, comparator, keyClass,
55                                                valueClass);
56     reducer.run(reducerContext);
57     trackedRW.close(reducerContext);
58   }
View Code

相关文章:

  • 2022-12-23
  • 2021-10-20
  • 2021-10-31
  • 2022-02-11
  • 2021-06-26
  • 2021-11-09
  • 2021-11-20
  • 2021-05-17
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-01-05
  • 2021-09-11
  • 2021-12-17
  • 2021-07-01
  • 2022-01-15
相关资源
相似解决方案