在监听器初始化Job、JobTracker相应TaskTracker心跳、调度器分配task源码级分析中我们分析的Tasktracker发送心跳的机制,这一节我们分析TaskTracker接受JobTracker的响应信息后的工作内容。

  TaskTracker中的transmitHeartBeat方法通过调用JobTracker.heartbeat方法获得心跳的响应信息HeartbeatResponse,然后返回给TaskTracker.offerService()方法。HeartbeatResponse中包含了以下几个重要的信息:

  (1)可能包含一个cleanup task或者一个setup task,一个心跳只能包含一个这种类型的task。优先考虑map的cleanup,然后map的setup,然后reduce的cleanup,然后reduce的setup;

  (2)调度器分配的MapTask(可以有多个,最多有一个非本地的Map(而且一旦有此种类的Map,则会停止分配Map,返回Map列表))或者ReduceTask(一次心跳最多分配1个);

  (3)TaskTracker上对应的一些应该被Kill的Task;

  (4)TaskTracker上对应的一些应该被Kill的Job;

  (5)TaskTracker上可以保存数据的Task;

  (6)下一次的心跳间隔;

  (7)如果JobTracker重启了,还会有需要恢复的Job列表;

  (8)还有就是只返回重启命令ReinitTrackerAction。如果TaskTracker不是第一次发送心跳链接JobTracker,且JobTracker也没重启,并且没有此TaskTracker上一次心跳信息,说明可能存在严重的问题,因此让此tasktracker重新初始化。

  TaskTracker.offerService()方法是一个while循环,始终是执行等待心跳时间发送心跳,接受响应信息,分析响应信息中的任务。接受到响应信息HeartbeatResponse之后:

  一、获取恢复作业列表(如果响应信息中有要恢复的作业),重置各个Job的状态,然后将所有正在运行的处于SHUFFLE阶段的Reduce Task回滚放入shouldReset中;

  二、然后调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组:TaskTrackerAction[] actions = heartbeatResponse.getActions()。

  三、如果actions是重新初始化命令则会直接返回State.STALE到run()中,会跳出内层while循环,然后外层while继续执行,调用initialize()方法进行初始化,并再次执行offerService()。

  四、重置心跳间隔heartbeatInterval = heartbeatResponse.getHeartbeatInterval()

  五、置justStarted、justInited都为false表示已经启动服务,并已连接JobTracker

  六、遍历actions数组:

  (1)如果是LaunchTaskAction,则调用addToTaskQueue((LaunchTaskAction)action)将Action添加到任务队列中,加入TaskLauncher线程的执行队列。addToTaskQueue方法会根据LaunchTaskAction的类型将这个action加入mapLauncher或者reduceLauncher,这两个launcher都是TaskLauncher extends Thread的对象,这两个线程对象都是在initialize()时初始化,会通过addToTaskQueue(action)方法将action加入 List<TaskInProgress> tasksToLaunch列表,注意这个TaskInProgress是TaskTracker.TaskInProgress,而非MapRed包中的 TaskInProgress类。TaskLauncher类的run方法会始终监控tasksToLaunch,一旦发现有新的任务,就获取第一个task,并检查是否可以运行此task等待有足够的slot来运行此task,还要判断(canBeLaunched()方法)此task的运行状态必须是UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN三者之一才可以执行。最终通过startNewTask(tip)方法来执行。

  (2)如果是CommitTaskAction,就加入commitResponses.add(commitAction.getTaskID()),这类任务指的是处理完数据之后,将最终结果从临时目录转移到最终目录的过程,只有将输出结果直接写到HDFS上的任务才会经历这个过程,只有两类任务:reduce task和map-only类型的map task。不管是map task、Reduce task、setup task、cleanup job task、cleanup task task执行完后都会调用done(umbilical, reporter)该方法会通过层层调用找到commitResponses等待JobTracker的commit命令。

  (3)其他则直接加入tasksToCleanup.put(action),包括杀死任务或作业。taskCleanupThread线程会始终监控tasksToCleanup队列,从中take一个TaskTrackerAction action,如果这个action是KillJobAction类型,就调用方法purgeJob((KillJobAction) action)来处理,这个方法会从runningJobs获取对应的RunningJob,如果允许清理文件会将这个job对应的文件都删除,将这个RunningJob对应的所有task清空;如果这个action是KillTaskAction,就调用processKillTaskAction((KillTaskAction) action)来处理:会从tasks中获取对应的TaskInProgress,然后从runningJobs中找到对应的RunningJob,并从RunningJob中的task列表中删除这个task。

  七、markUnresponsiveTasks(),杀死一定时间没没有汇报进度的task

  八、killOverflowingTasks(),当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间

  九、到这已经做了清理和恢复工作,所以如果acceptNewTasks==false并且此tasktracker处于空闲,就将acceptNewTasks=true,可以接受新的任务了

  十、checkJettyPort(server.getPort()),官方给的解释是:为了谨慎,因为有些情况获得的jetty端口不一致。检查是如果端口号小于0,shuttingDown = true这样会使得run中的两层循环、offerService()中的while循环都退出,致使main()结束运行,该tasktracker关闭。

  上面的六中介绍了各种类型的任务,其中map task和reduce task都是通过startNewTask(tip)方法来启动的。这个方法对每个TaskTracker.TaskInProgress都会启动一个单独的线程来执行,这个线程的run方法主要工作是,一旦运行过程出错,异常处理会将这个tip杀死,并清理相对于的一些数据。:  

1       RunningJob rjob = localizeJob(tip);    
2           tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
3           // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
4           launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //执行task

  (1)localizeJob(tip)方法是确保首先对作业进行本地化,即第一个tip要对作业进行本地化,后续的tip只对任务本地化。会调用initializeJob(t, rjob, ttAddr)方法对作业进行本地化,会从HDFS下载JobToken和job.xml到本地,然后通过TaskController.initializeJob方法完成剩余的工作,默认是DefaultTaskController,这个initializeJob方法会在本地创建一些目录,并下载job.jar到本地,创建job-acls.xml保存作业访问控制权限等信息。在这个方法中除了作业初始化其他的任务初始化基本没做什么工作。

  (2)launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob)方法来执行,会调用TaskTracker.TaskInProgress的launchTask()函数启动Task,如果这个task的状态是UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN三者之一,就调用方法对localizeTask(task)对task做一些配置信息,然后创建一个TaskRunner,如果是map类型的任务会创建MapTaskRunner,如果是reduce类型的任务会创建ReduceTaskRunner,但任务的启动最终均是其父类TaskRunner.run()方法完成。启动TaskRunner。TaskRunner是一个线程类,其run()方法代码如下:  

  1   @Override
  2   public final void run() {
  3     String errorInfo = "Child Error";
  4     try {
  5       
  6       //before preparing the job localize 
  7       //all the archives
  8       TaskAttemptID taskid = t.getTaskID();
  9       final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
 10       //simply get the location of the workDir and pass it to the child. The
 11       //child will do the actual dir creation
 12       final File workDir =
 13       new File(new Path(localdirs[rand.nextInt(localdirs.length)], 
 14           TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), 
 15           taskid.toString(),
 16           t.isTaskCleanupTask())).toString());
 17       
 18       String user = tip.getUGI().getUserName();
 19       
 20       // Set up the child task's configuration. After this call, no localization
 21       // of files should happen in the TaskTracker's process space. Any changes to
 22       // the conf object after this will NOT be reflected to the child.
 23       // setupChildTaskConfiguration(lDirAlloc);
 24 
 25       if (!prepare()) {
 26         return;
 27       }
 28       
 29       // Accumulates class paths for child.
 30       List<String> classPaths = getClassPaths(conf, workDir,
 31                                               taskDistributedCacheManager);
 32 
 33       long logSize = TaskLog.getTaskLogLength(conf);
 34       
 35       //  Build exec child JVM args.
 36       Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
 37       
 38       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 39 
 40       // set memory limit using ulimit if feasible and necessary ...
 41       String setup = getVMSetupCmd();
 42       // Set up the redirection of the task's stdout and stderr streams
 43       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
 44       File stdout = logFiles[0];
 45       File stderr = logFiles[1];
 46       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
 47                  stderr);
 48       
 49       Map<String, String> env = new HashMap<String, String>();
 50       errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
 51                                    logSize);
 52       
 53       // flatten the env as a set of export commands
 54       List <String> setupCmds = new ArrayList<String>();
 55       for(Entry<String, String> entry : env.entrySet()) {
 56         StringBuffer sb = new StringBuffer();
 57         sb.append("export ");
 58         sb.append(entry.getKey());
 59         sb.append("=\"");
 60         sb.append(entry.getValue());
 61         sb.append("\"");
 62         setupCmds.add(sb.toString());
 63       }
 64       setupCmds.add(setup);
 65       
 66       launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
 67       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
 68       if (exitCodeSet) {
 69         if (!killed && exitCode != 0) {
 70           if (exitCode == 65) {
 71             tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
 72           }
 73           throw new IOException("Task process exit with nonzero status of " +
 74               exitCode + ".");
 75         }
 76       }
 77     } catch (FSError e) {
 78       LOG.fatal("FSError", e);
 79       try {
 80         tracker.fsErrorInternal(t.getTaskID(), e.getMessage());
 81       } catch (IOException ie) {
 82         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
 83       }
 84     } catch (Throwable throwable) {
 85       LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
 86       Throwable causeThrowable = new Throwable(errorInfo, throwable);
 87       ByteArrayOutputStream baos = new ByteArrayOutputStream();
 88       causeThrowable.printStackTrace(new PrintStream(baos));
 89       try {
 90         tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());
 91       } catch (IOException e) {
 92         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
 93       }
 94     } finally {
 95       
 96       // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
 97       // *false* since the task has either
 98       // a) SUCCEEDED - which means commit has been done
 99       // b) FAILED - which means we do not need to commit
100       tip.reportTaskFinished(false);
101     }
102   }
View Code

相关文章: