一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map任务。接上上篇文章,TaskRunner线程执行中,会构造一个java –D** Child address port tasked这样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化) 

Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。

二、 流程描述  

1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。

2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。

2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。

2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。

2.3LocalFSMerger对磁盘上的map 输出进行归并。

2.4nMemFSMergeThread对内存中的map输出进行归并。

3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。

4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。。

5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。

6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。

【Hadoop代码笔记】Hadoop作业提交之Child启动reduce任务

三、代码详细

1. Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。Reduce和map一样都是由该main函数调用。所以此处不做描述,详细见上节Child启动map任务

2. ReduceTask的run方法。在Child子进程中被调用,执行用户定义的Reduce操作。前面代码逻辑和MapTask类似。通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的 JobTracker的 heartbeat方法处的详细解释。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。

@SuppressWarnings("unchecked")
    public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
            throws IOException, InterruptedException, ClassNotFoundException {
        job.setBoolean("mapred.skip.on", isSkipping());

        if (isMapOrReduce()) {
            copyPhase = getProgress().addPhase("copy");
            sortPhase  = getProgress().addPhase("sort");
            reducePhase = getProgress().addPhase("reduce");
        }
        // start thread that will handle communication with parent
        TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
        reporter.startCommunicationThread();
        boolean useNewApi = job.getUseNewReducer();
        initialize(job, getJobID(), reporter, useNewApi);

        // check if it is a cleanupJobTask
        if (jobCleanup) {
            runJobCleanupTask(umbilical, reporter);
            return;
        }
        if (jobSetup) {
            runJobSetupTask(umbilical, reporter);
            return;
        }
        if (taskCleanup) {
            runTaskCleanupTask(umbilical, reporter);
            return;
        }

        // Initialize the codec
        codec = initCodec();

        boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));

        //如果不是一个本地执行额模式(就是配置中不是分布式的),则要启动一个ReduceCopier来拷贝Map的输出,即Reduce的输入。
        if (!isLocal) {
            reduceCopier = new ReduceCopier(umbilical, job, reporter);
            if (!reduceCopier.fetchOutputs()) {
                if(reduceCopier.mergeThrowable instanceof FSError) {
                    LOG.error("Task: " + getTaskID() + " - FSError: " + 
                            StringUtils.stringifyException(reduceCopier.mergeThrowable));
                    umbilical.fsError(getTaskID(), 
                            reduceCopier.mergeThrowable.getMessage());
                }
                throw new IOException("Task: " + getTaskID() + 
                        " - The reduce copier failed", reduceCopier.mergeThrowable);
            }
        }
        copyPhase.complete();                       
        //拷贝完成后,进入sort阶段。
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);

        final FileSystem rfs = FileSystem.getLocal(job).getRaw();
        RawKeyValueIterator rIter = isLocal
                ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
                        job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
                        !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
                        new Path(getTaskID().toString()), job.getOutputKeyComparator(),
                        reporter, spilledRecordsCounter, null)
                        : reduceCopier.createKVIterator(job, rfs, reporter);

                // free up the data structures
                mapOutputFilesOnDisk.clear();

                sortPhase.complete();                         // sort is complete
                setPhase(TaskStatus.Phase.REDUCE); 
                statusUpdate(umbilical);
                Class keyClass = job.getMapOutputKeyClass();
                Class valueClass = job.getMapOutputValueClass();
                RawComparator comparator = job.getOutputValueGroupingComparator();

                if (useNewApi) {
                    runNewReducer(job, umbilical, reporter, rIter, comparator, 
                            keyClass, valueClass);
                } else {
                    runOldReducer(job, umbilical, reporter, rIter, comparator, 
                            keyClass, valueClass);
                }
                done(umbilical, reporter);
    }

 3. ReduceCopier类的fetchOutputs方法。该方法负责将map的输出拷贝的reduce端进程处理。从代码上看,启动了一个LocalFSMerger、InMemFSMergeThread、  GetMapEventsThread 和若干个MapOutputCopier线程。几个独立的线程。相互配合,并分别独立的完成任务。

public boolean fetchOutputs() throws IOException {
      int totalFailures = 0;
      int            numInFlight = 0, numCopied = 0;
      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
      final Progress copyPhase = 
        reduceTask.getProgress().phase();
      LocalFSMerger localFSMergerThread = null;
      InMemFSMergeThread inMemFSMergeThread = null;
      GetMapEventsThread getMapEventsThread = null;
      
   
      for (int i = 0; i < numMaps; i++) {
        copyPhase.addPhase();       // add sub-phase per file
      }
      
      //1)根据配置的numCopiers数量构造若干个MapOutputCopier拷贝线程,默认是5个,正是这些MapOutputCopier来实施的拷贝任务。
      copiers = new ArrayList<MapOutputCopier>(numCopiers);
      
      // start all the copying threads
      for (int i=0; i < numCopiers; i++) {
        MapOutputCopier copier = new MapOutputCopier(conf, reporter);
        copiers.add(copier);
        
        copier.start();
      }
      
      //start the on-disk-merge thread 2)启动磁盘merge线程(参照后面方法)
      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
      //start the in memory merger thread 3)启动内存merge线程(参照后面方法)
      inMemFSMergeThread = new InMemFSMergeThread();
      localFSMergerThread.start();
      inMemFSMergeThread.start();
      
      // start the map events thread 4)启动merge事件获取线程
      getMapEventsThread = new GetMapEventsThread();
      getMapEventsThread.start();
      
      // start the clock for bandwidth measurement
      long startTime = System.currentTimeMillis();
      long currentTime = startTime;
      long lastProgressTime = startTime;
      long lastOutputTime = 0;
      
        // loop until we get all required outputs
      //5)当获取到的copiedMapOutputs数量小于map数时,说明还没有拷贝完成,则一直执行。在执行中会根据时间进度一直打印输出,表示已经拷贝了多少个map的输出,还有多万未完成。
        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
          
          currentTime = System.currentTimeMillis();
          boolean logNow = false;
          if (currentTime - lastOutputTime > MIN_LOG_TIME) {
            lastOutputTime = currentTime;
            logNow = true;
          }
          if (logNow) {
            LOG.info(reduceTask.getTaskID() + " Need another " 
                   + (numMaps - copiedMapOutputs.size()) + " map output(s) "
                   + "where " + numInFlight + " is already in progress");
          }

          // Put the hash entries for the failed fetches.
          Iterator<MapOutputLocation> locItr = retryFetches.iterator();

          while (locItr.hasNext()) {
            MapOutputLocation loc = locItr.next(); 
            List<MapOutputLocation> locList = 
              mapLocations.get(loc.getHost());
            
            // Check if the list exists. Map output location mapping is cleared 
            // once the jobtracker restarts and is rebuilt from scratch.
            // Note that map-output-location mapping will be recreated and hence
            // we continue with the hope that we might find some locations
            // from the rebuild map.
            if (locList != null) {
              // Add to the beginning of the list so that this map is 
              //tried again before the others and we can hasten the 
              //re-execution of this map should there be a problem
              locList.add(0, loc);
            }
          }

          if (retryFetches.size() > 0) {
            LOG.info(reduceTask.getTaskID() + ": " +  
                  "Got " + retryFetches.size() +
                  " map-outputs from previous failures");
          }
          // clear the "failed" fetches hashmap
          retryFetches.clear();

          // now walk through the cache and schedule what we can
          int numScheduled = 0;
          int numDups = 0;
          
          synchronized (scheduledCopies) {
  
            // Randomize the map output locations to prevent 
            // all reduce-tasks swamping the same tasktracker
            List<String> hostList = new ArrayList<String>();
            hostList.addAll(mapLocations.keySet()); 
            
            Collections.shuffle(hostList, this.random);
              
            Iterator<String> hostsItr = hostList.iterator();

            while (hostsItr.hasNext()) {
            
              String host = hostsItr.next();

              List<MapOutputLocation> knownOutputsByLoc = 
                mapLocations.get(host);

              // Check if the list exists. Map output location mapping is 
              // cleared once the jobtracker restarts and is rebuilt from 
              // scratch.
              // Note that map-output-location mapping will be recreated and 
              // hence we continue with the hope that we might find some 
              // locations from the rebuild map and add then for fetching.
              if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
                continue;
              }
              
              //Identify duplicate hosts here
              if (uniqueHosts.contains(host)) {
                 numDups += knownOutputsByLoc.size(); 
                 continue;
              }

              Long penaltyEnd = penaltyBox.get(host);
              boolean penalized = false;
            
              if (penaltyEnd != null) {
                if (currentTime < penaltyEnd.longValue()) {
                  penalized = true;
                } else {
                  penaltyBox.remove(host);
                }
              }
              
              if (penalized)
                continue;

              synchronized (knownOutputsByLoc) {
              
                locItr = knownOutputsByLoc.iterator();
            
                while (locItr.hasNext()) {
              
                  MapOutputLocation loc = locItr.next();
              
                  // Do not schedule fetches from OBSOLETE maps
                  if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                    locItr.remove();
                    continue;
                  }

                  uniqueHosts.add(host);
                  scheduledCopies.add(loc);
                  locItr.remove();  // remove from knownOutputs
                  numInFlight++; numScheduled++;

                  break; //we have a map from this host
                }
              }
            }
            scheduledCopies.notifyAll();
          }

          if (numScheduled > 0 || logNow) {
            LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
                   " outputs (" + penaltyBox.size() +
                   " slow hosts and" + numDups + " dup hosts)");
          }

          if (penaltyBox.size() > 0 && logNow) {
            LOG.info("Penalized(slow) Hosts: ");
            for (String host : penaltyBox.keySet()) {
              LOG.info(host + " Will be considered after: " + 
                  ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
            }
          }

          // if we have no copies in flight and we can't schedule anything
          // new, just wait for a bit
          try {
            if (numInFlight == 0 && numScheduled == 0) {
              // we should indicate progress as we don't want TT to think
              // we're stuck and kill us
              reporter.progress();
              Thread.sleep(5000);
            }
          } catch (InterruptedException e) { } // IGNORE
          
          while (numInFlight > 0 && mergeThrowable == null) {
            LOG.debug(reduceTask.getTaskID() + " numInFlight = " + 
                      numInFlight);
            //the call to getCopyResult will either 
            //1) return immediately with a null or a valid CopyResult object,
            //                 or
            //2) if the numInFlight is above maxInFlight, return with a 
            //   CopyResult object after getting a notification from a 
            //   fetcher thread, 
            //So, when getCopyResult returns null, we can be sure that
            //we aren't busy enough and we should go and get more mapcompletion
            //events from the tasktracker
            CopyResult cr = getCopyResult(numInFlight);

            if (cr == null) {
              break;
            }
            
            if (cr.getSuccess()) {  // a successful copy
              numCopied++;
              lastProgressTime = System.currentTimeMillis();
              reduceShuffleBytes.increment(cr.getSize());
                
              long secsSinceStart = 
                (System.currentTimeMillis()-startTime)/1000+1;
              float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
              float transferRate = mbs/secsSinceStart;
                
              copyPhase.startNextPhase();
              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
                                  + " at " +
                                  mbpsFormat.format(transferRate) +  " MB/s)");
                
              // Note successful fetch for this mapId to invalidate
              // (possibly) old fetch-failures
              fetchFailedMaps.remove(cr.getLocation().getTaskId());
            } else if (cr.isObsolete()) {
              //ignore
              LOG.info(reduceTask.getTaskID() + 
                       " Ignoring obsolete copy result for Map Task: " + 
                       cr.getLocation().getTaskAttemptId() + " from host: " + 
                       cr.getHost());
            } else {
              retryFetches.add(cr.getLocation());
              
              // note the failed-fetch
              TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
              TaskID mapId = cr.getLocation().getTaskId();
              
              totalFailures++;
              Integer noFailedFetches = 
                mapTaskToFailedFetchesMap.get(mapTaskId);
              noFailedFetches = 
                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
                       noFailedFetches + " from " + mapTaskId);
              
              // did the fetch fail too many times?
              // using a hybrid technique for notifying the jobtracker.
              //   a. the first notification is sent after max-retries 
              //   b. subsequent notifications are sent after 2 retries.   
              if ((noFailedFetches >= maxFetchRetriesPerMap) 
                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
                synchronized (ReduceTask.this) {
                  taskStatus.addFetchFailedMap(mapTaskId);
                  LOG.info("Failed to fetch map-output from " + mapTaskId + 
                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
                           + " reporting to the JobTracker");
                }
              }
              // note unique failed-fetch maps
              if (noFailedFetches == maxFetchRetriesPerMap) {
                fetchFailedMaps.add(mapId);
                  
                // did we have too many unique failed-fetch maps?
                // and did we fail on too many fetch attempts?
                // and did we progress enough
                //     or did we wait for too long without any progress?
               
                // check if the reducer is healthy
                boolean reducerHealthy = 
                    (((float)totalFailures / (totalFailures + numCopied)) 
                     < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
                
                // check if the reducer has progressed enough
                boolean reducerProgressedEnough = 
                    (((float)numCopied / numMaps) 
                     >= MIN_REQUIRED_PROGRESS_PERCENT);
                
                // check if the reducer is stalled for a long time
                // duration for which the reducer is stalled
                int stallDuration = 
                    (int)(System.currentTimeMillis() - lastProgressTime);
                // duration for which the reducer ran with progress
                int shuffleProgressDuration = 
                    (int)(lastProgressTime - startTime);
                // min time the reducer should run without getting killed
                int minShuffleRunDuration = 
                    (shuffleProgressDuration > maxMapRuntime) 
                    ? shuffleProgressDuration 
                    : maxMapRuntime;
                boolean reducerStalled = 
                    (((float)stallDuration / minShuffleRunDuration) 
                     >= MAX_ALLOWED_STALL_TIME_PERCENT);
                
                // kill if not healthy and has insufficient progress
                if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
                     fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
                    && !reducerHealthy 
                    && (!reducerProgressedEnough || reducerStalled)) { 
                  LOG.fatal("Shuffle failed with too many fetch failures " + 
                            "and insufficient progress!" +
                            "Killing task " + getTaskID() + ".");
                  umbilical.shuffleError(getTaskID(), 
                                         "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
                                         + " bailing-out.");
                }
              }
                
              // back off exponentially until num_retries <= max_retries
              // back off by max_backoff/2 on subsequent failed attempts
              currentTime = System.currentTimeMillis();
              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
                                   ? BACKOFF_INIT 
                                     * (1 << (noFailedFetches - 1)) 
                                   : (this.maxBackoff * 1000 / 2);
              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
              LOG.warn(reduceTask.getTaskID() + " adding host " +
                       cr.getHost() + " to penalty box, next contact in " +
                       (currentBackOff/1000) + " seconds");
            }
            uniqueHosts.remove(cr.getHost());
            numInFlight--;
          }
        }
        
        // all done, inform the copiers to exit
        exitGetMapEvents= true;
        try {
          getMapEventsThread.join();
          LOG.info("getMapsEventsThread joined.");
        } catch (Throwable t) {
          LOG.info("getMapsEventsThread threw an exception: " +
              StringUtils.stringifyException(t));
        }

        synchronized (copiers) {
          synchronized (scheduledCopies) {
            for (MapOutputCopier copier : copiers) {
              copier.interrupt();
            }
            copiers.clear();
          }
        }
        
        // copiers are done, exit and notify the waiting merge threads
        synchronized (mapOutputFilesOnDisk) {
          exitLocalFSMerge = true;
          mapOutputFilesOnDisk.notify();
        }
        
        ramManager.close();
        
        //Do a merge of in-memory files (if there are any)
        if (mergeThrowable == null) {
          try {
            // Wait for the on-disk merge to complete
            localFSMergerThread.join();
            LOG.info("Interleaved on-disk merge complete: " + 
                     mapOutputFilesOnDisk.size() + " files left.");
            
            //wait for an ongoing merge (if it is in flight) to complete
            inMemFSMergeThread.join();
            LOG.info("In-memory merge complete: " + 
                     mapOutputsFilesInMemory.size() + " files left.");
            } catch (Throwable t) {
            LOG.warn(reduceTask.getTaskID() +
                     " Final merge of the inmemory files threw an exception: " + 
                     StringUtils.stringifyException(t));
            // check if the last merge generated an error
            if (mergeThrowable != null) {
              mergeThrowable = t;
            }
            return false;
          }
        }
        return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
    }
fetchOutputs

相关文章: