JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析,TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务。用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配。mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListener。
默认调度器创建了两个Listener:JobQueueJobInProgressListener和EagerTaskInitializationListener,用户提交的作业被封装成JobInProgress job加入这两个Listener。
一、JobQueueJobInProgressListener.jobAdded(job)会将此JobInProgress放入Map<JobSchedulingInfo, JobInProgress> jobQueue中。
二、EagerTaskInitializationListener.jobAdded(job)会将此JobInProgress放入List<JobInProgress> jobInitQueue中,然后调用resortInitQueue()对这个列表进行排序先按优先级相同则按开始时间;然后唤醒在此对象监视器上等待的所有线程jobInitQueue.notifyAll()。EagerTaskInitializationListener.start()方法已经在调度器start时运行,会创建一个线程JobInitManager implements Runnable,它的run方法主要是监控jobInitQueue列表,一旦发现不为空就获取第一个JobInProgress,然后创建一个InitJob implements Runnable初始化线程并放入线程池ExecutorService threadPool(这个线程池在构建EagerTaskInitializationListener对象时由构造方法实现),InitJob线程的run方法就一句话ttm.initJob(job),调用的是JobTracker的initJob(job)方法对JIP进行初始化,实际调用JobInProgress.initTasks()对job进行初始化,initTasks()方法代码如下:
1 /** 2 * Construct the splits, etc. This is invoked from an async 3 * thread so that split-computation doesn't block anyone. 4 */ 5 //任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。 6 public synchronized void initTasks() 7 throws IOException, KillInterruptedException, UnknownHostException { 8 if (tasksInited || isComplete()) { 9 return; 10 } 11 synchronized(jobInitKillStatus){ 12 if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) { 13 return; 14 } 15 jobInitKillStatus.initStarted = true; 16 } 17 18 LOG.info("Initializing " + jobId); 19 final long startTimeFinal = this.startTime; 20 // log job info as the user running the job 21 try { 22 userUGI.doAs(new PrivilegedExceptionAction<Object>() { 23 @Override 24 public Object run() throws Exception { 25 JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, 26 startTimeFinal, hasRestarted()); 27 return null; 28 } 29 }); 30 } catch(InterruptedException ie) { 31 throw new IOException(ie); 32 } 33 34 // log the job priority 35 setPriority(this.priority); 36 37 // 38 // generate security keys needed by Tasks 39 // 40 generateAndStoreTokens(); 41 42 // 43 // read input splits and create a map per a split 44 // 45 TaskSplitMetaInfo[] splits = createSplits(jobId); 46 if (numMapTasks != splits.length) { 47 throw new IOException("Number of maps in JobConf doesn't match number of " + 48 "recieved splits for job " + jobId + "! " + 49 "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); 50 } 51 numMapTasks = splits.length;//map task的个数就是input split的个数 52 53 // Sanity check the locations so we don't create/initialize unnecessary tasks 54 for (TaskSplitMetaInfo split : splits) { 55 NetUtils.verifyHostnames(split.getLocations()); 56 } 57 58 jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); 59 jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); 60 this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks); 61 this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks); 62 63 maps = new TaskInProgress[numMapTasks]; //为每个map tasks生成一个TaskInProgress来处理一个input split 64 for(int i=0; i < numMapTasks; ++i) { 65 inputLength += splits[i].getInputDataLength(); 66 maps[i] = new TaskInProgress(jobId, jobFile, //类型是map task 67 splits[i], 68 jobtracker, conf, this, i, numSlotsPerMap); 69 } 70 LOG.info("Input size for job " + jobId + " = " + inputLength 71 + ". Number of splits = " + splits.length); 72 73 // Set localityWaitFactor before creating cache 74 localityWaitFactor = 75 conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR); 76 /* 对于map task,将其放入nonRunningMapCache,是一个Map<Node,List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input 77 split所在的Node上。在此,Node代表一个datanode或者机架或者数据中 心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的 时候使用。 78 */ 79 if (numMapTasks > 0) { 80 //通过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。 81 //slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。 82 nonRunningMapCache = createCache(splits, maxLevel); 83 } 84 85 // set the launch time 86 this.launchTime = jobtracker.getClock().getTime(); 87 88 // 89 // Create reduce tasks 90 // 91 //其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建, 92 //缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同, 93 //TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地, 94 //initTasks()也会通过createCache()方法产生nonRunningReduceCache成员。 95 this.reduces = new TaskInProgress[numReduceTasks]; 96 for (int i = 0; i < numReduceTasks; i++) { 97 reduces[i] = new TaskInProgress(jobId, jobFile, //这是reduce task 98 numMapTasks, i, 99 jobtracker, conf, this, numSlotsPerReduce); 100 /*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。*/ 101 nonRunningReduces.add(reduces[i]); 102 } 103 104 // Calculate the minimum number of maps to be complete before 105 // we should start scheduling reduces 106 completedMapsForReduceSlowstart = 107 (int)Math.ceil( 108 (conf.getFloat("mapred.reduce.slowstart.completed.maps", 109 DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 110 numMapTasks)); 111 112 // ... use the same for estimating the total output of all maps 113 resourceEstimator.setThreshhold(completedMapsForReduceSlowstart); 114 115 // create cleanup two cleanup tips, one map and one reduce. 116 //创建两个cleanup task,一个用来清理map,一个用来清理reduce. 117 cleanup = new TaskInProgress[2]; 118 119 // cleanup map tip. This map doesn't use any splits. Just assign an empty 120 // split. 121 TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; 122 cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 123 jobtracker, conf, this, numMapTasks, 1); 124 cleanup[0].setJobCleanupTask(); 125 126 // cleanup reduce tip. 127 cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, 128 numReduceTasks, jobtracker, conf, this, 1); 129 cleanup[1].setJobCleanupTask(); 130 131 // create two setup tips, one map and one reduce. 132 //创建两个初始化 task,一个初始化map,一个初始化reduce. 133 setup = new TaskInProgress[2]; 134 135 // setup map tip. This map doesn't use any split. Just assign an empty 136 // split. 137 setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 138 jobtracker, conf, this, numMapTasks + 1, 1); 139 setup[0].setJobSetupTask(); 140 141 // setup reduce tip. 142 setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, 143 numReduceTasks + 1, jobtracker, conf, this, 1); 144 setup[1].setJobSetupTask(); 145 146 synchronized(jobInitKillStatus){ 147 jobInitKillStatus.initDone = true; 148 if(jobInitKillStatus.killed) { 149 throw new KillInterruptedException("Job " + jobId + " killed in init"); 150 } 151 } 152 //JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中, 153 //然后再调用JobHistory.JobInfo.logInited()记录job的执行日志。 154 tasksInited = true; 155 JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 156 numMapTasks, numReduceTasks); 157 158 // Log the number of map and reduce tasks 159 LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 160 + " map tasks and " + numReduceTasks + " reduce tasks."); 161 }