延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。
延迟调度的大体思想如下:
若该job找到一个node-local的MapTask,则返回该task;若找不到,则延迟调度。即在nodeLocalityDelay时长内,重新找到一个node-local的MapTask并返回;
否则等待时长超过nodeLocalityDelay之后,寻找一个rack-local的MapTask并返回;若找不到,则延迟调度。即在rackLocalityDelay时长内,重新找到一个rack-local的MapTask并返回;
否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。
FairScheduler.java中关于延迟调度的主要变量:
1 long nodeLocalityDelay://node-local已经等待的时间 2 long rackLocalityDelay: //rack-local已经等待的时间 3 boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过) 4 timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间 5 LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别 6 nodeLocalityDelay = rackLocalityDelay = 7 Math.min(15000 , (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):
1 /** 2 * Get the maximum locality level at which a given job is allowed to 3 * launch tasks, based on how long it has been waiting for local tasks. 4 * This is used to implement the "delay scheduling" feature of the Fair 5 * Scheduler for optimizing data locality. 6 * If the job has no locality information (e.g. it does not use HDFS), this 7 * method returns LocalityLevel.ANY, allowing tasks at any level. 8 * Otherwise, the job can only launch tasks at its current locality level 9 * or lower, unless it has waited at least nodeLocalityDelay or 10 * rackLocalityDelay milliseconds depends on the current level. If it 11 * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds, 12 * it can go to any level. 13 */ 14 protected LocalityLevel getAllowedLocalityLevel(JobInProgress job, 15 long currentTime) { 16 JobInfo info = infos.get(job); 17 if (info == null) { // Job not in infos (shouldn't happen) 18 LOG.error("getAllowedLocalityLevel called on job " + job 19 + ", which does not have a JobInfo in infos"); 20 return LocalityLevel.ANY; 21 } 22 if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information 23 return LocalityLevel.ANY; 24 } 25 // Don't wait for locality if the job's pool is starving for maps 26 Pool pool = poolMgr.getPool(job); 27 PoolSchedulable sched = pool.getMapSchedulable(); 28 long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName()); 29 long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout(); 30 if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout || 31 currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { 32 eventLog.log("INFO", "No delay scheduling for " 33 + job.getJobID() + " because it is being starved"); 34 return LocalityLevel.ANY; 35 } 36 // In the common case, compute locality level based on time waited 37 switch(info.lastMapLocalityLevel) { 38 case NODE: // Last task launched was node-local 39 if (info.timeWaitedForLocalMap >= 40 nodeLocalityDelay + rackLocalityDelay) 41 return LocalityLevel.ANY; 42 else if (info.timeWaitedForLocalMap >= nodeLocalityDelay) 43 return LocalityLevel.RACK; 44 else 45 return LocalityLevel.NODE; 46 case RACK: // Last task launched was rack-local 47 if (info.timeWaitedForLocalMap >= rackLocalityDelay) 48 return LocalityLevel.ANY; 49 else 50 return LocalityLevel.RACK; 51 default: // Last task was non-local; can launch anywhere 52 return LocalityLevel.ANY; 53 } 54 }