JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务。用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配。mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListener。

  默认调度器创建了两个Listener:JobQueueJobInProgressListener和EagerTaskInitializationListener,用户提交的作业被封装成JobInProgress job加入这两个Listener。

  一、JobQueueJobInProgressListener.jobAdded(job)会将此JobInProgress放入Map<JobSchedulingInfo, JobInProgress> jobQueue中。 

  二、EagerTaskInitializationListener.jobAdded(job)会将此JobInProgress放入List<JobInProgress> jobInitQueue中,然后调用resortInitQueue()对这个列表进行排序先按优先级相同则按开始时间;然后唤醒在此对象监视器上等待的所有线程jobInitQueue.notifyAll()。EagerTaskInitializationListener.start()方法已经在调度器start时运行,会创建一个线程JobInitManager implements Runnable,它的run方法主要是监控jobInitQueue列表,一旦发现不为空就获取第一个JobInProgress,然后创建一个InitJob implements Runnable初始化线程并放入线程池ExecutorService threadPool(这个线程池在构建EagerTaskInitializationListener对象时由构造方法实现),InitJob线程的run方法就一句话ttm.initJob(job),调用的是JobTracker的initJob(job)方法对JIP进行初始化,实际调用JobInProgress.initTasks()对job进行初始化,initTasks()方法代码如下:

  1  /**
  2    * Construct the splits, etc.  This is invoked from an async
  3    * thread so that split-computation doesn't block anyone.
  4    */
  5   //任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。
  6   public synchronized void initTasks() 
  7   throws IOException, KillInterruptedException, UnknownHostException {
  8     if (tasksInited || isComplete()) {
  9       return;
 10     }
 11     synchronized(jobInitKillStatus){
 12       if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
 13         return;
 14       }
 15       jobInitKillStatus.initStarted = true;
 16     }
 17 
 18     LOG.info("Initializing " + jobId);
 19     final long startTimeFinal = this.startTime;
 20     // log job info as the user running the job
 21     try {
 22     userUGI.doAs(new PrivilegedExceptionAction<Object>() {
 23       @Override
 24       public Object run() throws Exception {
 25         JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 
 26             startTimeFinal, hasRestarted());
 27         return null;
 28       }
 29     });
 30     } catch(InterruptedException ie) {
 31       throw new IOException(ie);
 32     }
 33     
 34     // log the job priority
 35     setPriority(this.priority);
 36     
 37     //
 38     // generate security keys needed by Tasks
 39     //
 40     generateAndStoreTokens();
 41     
 42     //
 43     // read input splits and create a map per a split
 44     //
 45     TaskSplitMetaInfo[] splits = createSplits(jobId);
 46     if (numMapTasks != splits.length) {
 47       throw new IOException("Number of maps in JobConf doesn't match number of " +
 48               "recieved splits for job " + jobId + "! " +
 49               "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
 50     }
 51     numMapTasks = splits.length;//map task的个数就是input split的个数
 52 
 53     // Sanity check the locations so we don't create/initialize unnecessary tasks
 54     for (TaskSplitMetaInfo split : splits) {
 55       NetUtils.verifyHostnames(split.getLocations());
 56     }
 57     
 58     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
 59     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 60     this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
 61     this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
 62 
 63     maps = new TaskInProgress[numMapTasks]; //为每个map tasks生成一个TaskInProgress来处理一个input split 
 64     for(int i=0; i < numMapTasks; ++i) {
 65       inputLength += splits[i].getInputDataLength();
 66       maps[i] = new TaskInProgress(jobId, jobFile,         //类型是map task
 67                                    splits[i], 
 68                                    jobtracker, conf, this, i, numSlotsPerMap);
 69     }
 70     LOG.info("Input size for job " + jobId + " = " + inputLength
 71         + ". Number of splits = " + splits.length);
 72 
 73     // Set localityWaitFactor before creating cache
 74     localityWaitFactor = 
 75       conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
 76     /* 对于map task,将其放入nonRunningMapCache,是一个Map<Node,List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input
 77     split所在的Node上。在此,Node代表一个datanode或者机架或者数据中  心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的 时候使用。
 78     */ 
 79     if (numMapTasks > 0) { 
 80         //通过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。
 81         //slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。
 82       nonRunningMapCache = createCache(splits, maxLevel);
 83     }
 84         
 85     // set the launch time
 86     this.launchTime = jobtracker.getClock().getTime();
 87 
 88     //
 89     // Create reduce tasks
 90     //
 91     //其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,
 92     //缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,
 93     //TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,
 94     //initTasks()也会通过createCache()方法产生nonRunningReduceCache成员。
 95     this.reduces = new TaskInProgress[numReduceTasks];
 96     for (int i = 0; i < numReduceTasks; i++) {
 97       reduces[i] = new TaskInProgress(jobId, jobFile,     //这是reduce task
 98                                       numMapTasks, i, 
 99                                       jobtracker, conf, this, numSlotsPerReduce);
100       /*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。*/ 
101       nonRunningReduces.add(reduces[i]);
102     }
103 
104     // Calculate the minimum number of maps to be complete before 
105     // we should start scheduling reduces
106     completedMapsForReduceSlowstart = 
107       (int)Math.ceil(
108           (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
109                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
110            numMapTasks));
111     
112     // ... use the same for estimating the total output of all maps
113     resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
114     
115     // create cleanup two cleanup tips, one map and one reduce.
116   //创建两个cleanup task,一个用来清理map,一个用来清理reduce. 
117     cleanup = new TaskInProgress[2];
118 
119     // cleanup map tip. This map doesn't use any splits. Just assign an empty
120     // split.
121     TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
122     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
123             jobtracker, conf, this, numMapTasks, 1);
124     cleanup[0].setJobCleanupTask();
125 
126     // cleanup reduce tip.
127     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
128                        numReduceTasks, jobtracker, conf, this, 1);
129     cleanup[1].setJobCleanupTask();
130 
131     // create two setup tips, one map and one reduce.
132     //创建两个初始化 task,一个初始化map,一个初始化reduce. 
133     setup = new TaskInProgress[2];
134 
135     // setup map tip. This map doesn't use any split. Just assign an empty
136     // split.
137     setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
138             jobtracker, conf, this, numMapTasks + 1, 1);
139     setup[0].setJobSetupTask();
140 
141     // setup reduce tip.
142     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
143                        numReduceTasks + 1, jobtracker, conf, this, 1);
144     setup[1].setJobSetupTask();
145     
146     synchronized(jobInitKillStatus){
147       jobInitKillStatus.initDone = true;
148       if(jobInitKillStatus.killed) {
149         throw new KillInterruptedException("Job " + jobId + " killed in init");
150       }
151     }
152     //JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,
153     //然后再调用JobHistory.JobInfo.logInited()记录job的执行日志。
154     tasksInited = true;
155     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
156                                  numMapTasks, numReduceTasks);
157     
158    // Log the number of map and reduce tasks
159    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
160             + " map tasks and " + numReduceTasks + " reduce tasks.");
161   }
View Code

相关文章: