一、概要描述
在上篇博文描述了TaskTracker启动一个独立的java进程来执行Map任务。接上上篇文章,TaskRunner线程执行中,会构造一个java –D** Child address port tasked这样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。
Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)
Map 任务完成后,会通知其父tasktracker状态更新,然后tasktracker通知jobtracker。通过心跳机制来完成。因此jobtracker知道map输出和tasktracker之间的映射关系。Reducer的一个getMapCompletionEvents线程定期询问jobtracker以便获取map输出位置。
二、 流程描述
1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。
2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。
2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.3LocalFSMerger对磁盘上的map 输出进行归并。
2.4nMemFSMergeThread对内存中的map输出进行归并。
3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。
4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。。
5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable<VALUEIN>类型),调用reducer的reduce方法进行处理。
6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。
三、代码详细
1. Child的main方法每个task进程都会被在单独的进程中执行,这个方法就是这些进程的入口方法。Reduce和map一样都是由该main函数调用。所以此处不做描述,详细见上节Child启动map任务。
2. ReduceTask的run方法。在Child子进程中被调用,执行用户定义的Reduce操作。前面代码逻辑和MapTask类似。通过TaskUmbilicalProtocol向tasktracker上报执行进度。开启线程向TaskTracker上报进度,根据task的不同动作要求执行不同的方法,如jobClean,jobsetup,taskCleanup。对于部分的了解可以产看taskTracker获取Task文章中的 JobTracker的 heartbeat方法处的详细解释。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean("mapred.skip.on", isSkipping());
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
// Initialize the codec
codec = initCodec();
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
//如果不是一个本地执行额模式(就是配置中不是分布式的),则要启动一个ReduceCopier来拷贝Map的输出,即Reduce的输入。
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
LOG.error("Task: " + getTaskID() + " - FSError: " +
StringUtils.stringifyException(reduceCopier.mergeThrowable));
umbilical.fsError(getTaskID(),
reduceCopier.mergeThrowable.getMessage());
}
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
}
}
copyPhase.complete();
//拷贝完成后,进入sort阶段。
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter);
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
3. ReduceCopier类的fetchOutputs方法。该方法负责将map的输出拷贝的reduce端进程处理。从代码上看,启动了一个LocalFSMerger、InMemFSMergeThread、 GetMapEventsThread 和若干个MapOutputCopier线程。几个独立的线程。相互配合,并分别独立的完成任务。
public boolean fetchOutputs() throws IOException { int totalFailures = 0; int numInFlight = 0, numCopied = 0; DecimalFormat mbpsFormat = new DecimalFormat("0.00"); final Progress copyPhase = reduceTask.getProgress().phase(); LocalFSMerger localFSMergerThread = null; InMemFSMergeThread inMemFSMergeThread = null; GetMapEventsThread getMapEventsThread = null; for (int i = 0; i < numMaps; i++) { copyPhase.addPhase(); // add sub-phase per file } //1)根据配置的numCopiers数量构造若干个MapOutputCopier拷贝线程,默认是5个,正是这些MapOutputCopier来实施的拷贝任务。 copiers = new ArrayList<MapOutputCopier>(numCopiers); // start all the copying threads for (int i=0; i < numCopiers; i++) { MapOutputCopier copier = new MapOutputCopier(conf, reporter); copiers.add(copier); copier.start(); } //start the on-disk-merge thread 2)启动磁盘merge线程(参照后面方法) localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys); //start the in memory merger thread 3)启动内存merge线程(参照后面方法) inMemFSMergeThread = new InMemFSMergeThread(); localFSMergerThread.start(); inMemFSMergeThread.start(); // start the map events thread 4)启动merge事件获取线程 getMapEventsThread = new GetMapEventsThread(); getMapEventsThread.start(); // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; long lastProgressTime = startTime; long lastOutputTime = 0; // loop until we get all required outputs //5)当获取到的copiedMapOutputs数量小于map数时,说明还没有拷贝完成,则一直执行。在执行中会根据时间进度一直打印输出,表示已经拷贝了多少个map的输出,还有多万未完成。 while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) { currentTime = System.currentTimeMillis(); boolean logNow = false; if (currentTime - lastOutputTime > MIN_LOG_TIME) { lastOutputTime = currentTime; logNow = true; } if (logNow) { LOG.info(reduceTask.getTaskID() + " Need another " + (numMaps - copiedMapOutputs.size()) + " map output(s) " + "where " + numInFlight + " is already in progress"); } // Put the hash entries for the failed fetches. Iterator<MapOutputLocation> locItr = retryFetches.iterator(); while (locItr.hasNext()) { MapOutputLocation loc = locItr.next(); List<MapOutputLocation> locList = mapLocations.get(loc.getHost()); // Check if the list exists. Map output location mapping is cleared // once the jobtracker restarts and is rebuilt from scratch. // Note that map-output-location mapping will be recreated and hence // we continue with the hope that we might find some locations // from the rebuild map. if (locList != null) { // Add to the beginning of the list so that this map is //tried again before the others and we can hasten the //re-execution of this map should there be a problem locList.add(0, loc); } } if (retryFetches.size() > 0) { LOG.info(reduceTask.getTaskID() + ": " + "Got " + retryFetches.size() + " map-outputs from previous failures"); } // clear the "failed" fetches hashmap retryFetches.clear(); // now walk through the cache and schedule what we can int numScheduled = 0; int numDups = 0; synchronized (scheduledCopies) { // Randomize the map output locations to prevent // all reduce-tasks swamping the same tasktracker List<String> hostList = new ArrayList<String>(); hostList.addAll(mapLocations.keySet()); Collections.shuffle(hostList, this.random); Iterator<String> hostsItr = hostList.iterator(); while (hostsItr.hasNext()) { String host = hostsItr.next(); List<MapOutputLocation> knownOutputsByLoc = mapLocations.get(host); // Check if the list exists. Map output location mapping is // cleared once the jobtracker restarts and is rebuilt from // scratch. // Note that map-output-location mapping will be recreated and // hence we continue with the hope that we might find some // locations from the rebuild map and add then for fetching. if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) { continue; } //Identify duplicate hosts here if (uniqueHosts.contains(host)) { numDups += knownOutputsByLoc.size(); continue; } Long penaltyEnd = penaltyBox.get(host); boolean penalized = false; if (penaltyEnd != null) { if (currentTime < penaltyEnd.longValue()) { penalized = true; } else { penaltyBox.remove(host); } } if (penalized) continue; synchronized (knownOutputsByLoc) { locItr = knownOutputsByLoc.iterator(); while (locItr.hasNext()) { MapOutputLocation loc = locItr.next(); // Do not schedule fetches from OBSOLETE maps if (obsoleteMapIds.contains(loc.getTaskAttemptId())) { locItr.remove(); continue; } uniqueHosts.add(host); scheduledCopies.add(loc); locItr.remove(); // remove from knownOutputs numInFlight++; numScheduled++; break; //we have a map from this host } } } scheduledCopies.notifyAll(); } if (numScheduled > 0 || logNow) { LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled + " outputs (" + penaltyBox.size() + " slow hosts and" + numDups + " dup hosts)"); } if (penaltyBox.size() > 0 && logNow) { LOG.info("Penalized(slow) Hosts: "); for (String host : penaltyBox.keySet()) { LOG.info(host + " Will be considered after: " + ((penaltyBox.get(host) - currentTime)/1000) + " seconds."); } } // if we have no copies in flight and we can't schedule anything // new, just wait for a bit try { if (numInFlight == 0 && numScheduled == 0) { // we should indicate progress as we don't want TT to think // we're stuck and kill us reporter.progress(); Thread.sleep(5000); } } catch (InterruptedException e) { } // IGNORE while (numInFlight > 0 && mergeThrowable == null) { LOG.debug(reduceTask.getTaskID() + " numInFlight = " + numInFlight); //the call to getCopyResult will either //1) return immediately with a null or a valid CopyResult object, // or //2) if the numInFlight is above maxInFlight, return with a // CopyResult object after getting a notification from a // fetcher thread, //So, when getCopyResult returns null, we can be sure that //we aren't busy enough and we should go and get more mapcompletion //events from the tasktracker CopyResult cr = getCopyResult(numInFlight); if (cr == null) { break; } if (cr.getSuccess()) { // a successful copy numCopied++; lastProgressTime = System.currentTimeMillis(); reduceShuffleBytes.increment(cr.getSize()); long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1; float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024); float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase(); copyPhase.setStatus("copy (" + numCopied + " of " + numMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)"); // Note successful fetch for this mapId to invalidate // (possibly) old fetch-failures fetchFailedMaps.remove(cr.getLocation().getTaskId()); } else if (cr.isObsolete()) { //ignore LOG.info(reduceTask.getTaskID() + " Ignoring obsolete copy result for Map Task: " + cr.getLocation().getTaskAttemptId() + " from host: " + cr.getHost()); } else { retryFetches.add(cr.getLocation()); // note the failed-fetch TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId(); TaskID mapId = cr.getLocation().getTaskId(); totalFailures++; Integer noFailedFetches = mapTaskToFailedFetchesMap.get(mapTaskId); noFailedFetches = (noFailedFetches == null) ? 1 : (noFailedFetches + 1); mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches); LOG.info("Task " + getTaskID() + ": Failed fetch #" + noFailedFetches + " from " + mapTaskId); // did the fetch fail too many times? // using a hybrid technique for notifying the jobtracker. // a. the first notification is sent after max-retries // b. subsequent notifications are sent after 2 retries. if ((noFailedFetches >= maxFetchRetriesPerMap) && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) { synchronized (ReduceTask.this) { taskStatus.addFetchFailedMap(mapTaskId); LOG.info("Failed to fetch map-output from " + mapTaskId + " even after MAX_FETCH_RETRIES_PER_MAP retries... " + " reporting to the JobTracker"); } } // note unique failed-fetch maps if (noFailedFetches == maxFetchRetriesPerMap) { fetchFailedMaps.add(mapId); // did we have too many unique failed-fetch maps? // and did we fail on too many fetch attempts? // and did we progress enough // or did we wait for too long without any progress? // check if the reducer is healthy boolean reducerHealthy = (((float)totalFailures / (totalFailures + numCopied)) < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); // check if the reducer has progressed enough boolean reducerProgressedEnough = (((float)numCopied / numMaps) >= MIN_REQUIRED_PROGRESS_PERCENT); // check if the reducer is stalled for a long time // duration for which the reducer is stalled int stallDuration = (int)(System.currentTimeMillis() - lastProgressTime); // duration for which the reducer ran with progress int shuffleProgressDuration = (int)(lastProgressTime - startTime); // min time the reducer should run without getting killed int minShuffleRunDuration = (shuffleProgressDuration > maxMapRuntime) ? shuffleProgressDuration : maxMapRuntime; boolean reducerStalled = (((float)stallDuration / minShuffleRunDuration) >= MAX_ALLOWED_STALL_TIME_PERCENT); // kill if not healthy and has insufficient progress if ((fetchFailedMaps.size() >= maxFailedUniqueFetches || fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size())) && !reducerHealthy && (!reducerProgressedEnough || reducerStalled)) { LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!" + "Killing task " + getTaskID() + "."); umbilical.shuffleError(getTaskID(), "Exceeded MAX_FAILED_UNIQUE_FETCHES;" + " bailing-out."); } } // back off exponentially until num_retries <= max_retries // back off by max_backoff/2 on subsequent failed attempts currentTime = System.currentTimeMillis(); int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap ? BACKOFF_INIT * (1 << (noFailedFetches - 1)) : (this.maxBackoff * 1000 / 2); penaltyBox.put(cr.getHost(), currentTime + currentBackOff); LOG.warn(reduceTask.getTaskID() + " adding host " + cr.getHost() + " to penalty box, next contact in " + (currentBackOff/1000) + " seconds"); } uniqueHosts.remove(cr.getHost()); numInFlight--; } } // all done, inform the copiers to exit exitGetMapEvents= true; try { getMapEventsThread.join(); LOG.info("getMapsEventsThread joined."); } catch (Throwable t) { LOG.info("getMapsEventsThread threw an exception: " + StringUtils.stringifyException(t)); } synchronized (copiers) { synchronized (scheduledCopies) { for (MapOutputCopier copier : copiers) { copier.interrupt(); } copiers.clear(); } } // copiers are done, exit and notify the waiting merge threads synchronized (mapOutputFilesOnDisk) { exitLocalFSMerge = true; mapOutputFilesOnDisk.notify(); } ramManager.close(); //Do a merge of in-memory files (if there are any) if (mergeThrowable == null) { try { // Wait for the on-disk merge to complete localFSMergerThread.join(); LOG.info("Interleaved on-disk merge complete: " + mapOutputFilesOnDisk.size() + " files left."); //wait for an ongoing merge (if it is in flight) to complete inMemFSMergeThread.join(); LOG.info("In-memory merge complete: " + mapOutputsFilesInMemory.size() + " files left."); } catch (Throwable t) { LOG.warn(reduceTask.getTaskID() + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)); // check if the last merge generated an error if (mergeThrowable != null) { mergeThrowable = t; } return false; } } return mergeThrowable == null && copiedMapOutputs.size() == numMaps; }