延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。

  延迟调度的大体思想如下:

  若该job找到一个node-local的MapTask,则返回该task;若找不到,则延迟调度。即在nodeLocalityDelay时长内,重新找到一个node-local的MapTask并返回;

  否则等待时长超过nodeLocalityDelay之后,寻找一个rack-local的MapTask并返回;若找不到,则延迟调度。即在rackLocalityDelay时长内,重新找到一个rack-local的MapTask并返回;

  否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。

 

  FairScheduler.java中关于延迟调度的主要变量:

1 long nodeLocalityDelay://node-local已经等待的时间
2 long rackLocalityDelay: //rack-local已经等待的时间
3 boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过)
4 timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间
5 LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别
6 nodeLocalityDelay = rackLocalityDelay =
7   Math.min(15000 ,  (long) (1.5 * jobTracker.getNextHeartbeatInterval()));

  

  在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):

 1 /**
 2    * Get the maximum locality level at which a given job is allowed to
 3    * launch tasks, based on how long it has been waiting for local tasks.
 4    * This is used to implement the "delay scheduling" feature of the Fair
 5    * Scheduler for optimizing data locality.
 6    * If the job has no locality information (e.g. it does not use HDFS), this 
 7    * method returns LocalityLevel.ANY, allowing tasks at any level.
 8    * Otherwise, the job can only launch tasks at its current locality level
 9    * or lower, unless it has waited at least nodeLocalityDelay or
10    * rackLocalityDelay milliseconds depends on the current level. If it
11    * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
12    * it can go to any level.
13    */
14   protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
15       long currentTime) {
16     JobInfo info = infos.get(job);
17     if (info == null) { // Job not in infos (shouldn't happen)
18       LOG.error("getAllowedLocalityLevel called on job " + job
19           + ", which does not have a JobInfo in infos");
20       return LocalityLevel.ANY;
21     }
22     if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
23       return LocalityLevel.ANY;
24     }
25     // Don't wait for locality if the job's pool is starving for maps
26     Pool pool = poolMgr.getPool(job);
27     PoolSchedulable sched = pool.getMapSchedulable();
28     long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
29     long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
30     if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
31         currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
32       eventLog.log("INFO", "No delay scheduling for "
33           + job.getJobID() + " because it is being starved");
34       return LocalityLevel.ANY;
35     }
36     // In the common case, compute locality level based on time waited
37     switch(info.lastMapLocalityLevel) {
38     case NODE: // Last task launched was node-local
39       if (info.timeWaitedForLocalMap >=
40           nodeLocalityDelay + rackLocalityDelay)
41         return LocalityLevel.ANY;
42       else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
43         return LocalityLevel.RACK;
44       else
45         return LocalityLevel.NODE;
46     case RACK: // Last task launched was rack-local
47       if (info.timeWaitedForLocalMap >= rackLocalityDelay)
48         return LocalityLevel.ANY;
49       else
50         return LocalityLevel.RACK;
51     default: // Last task was non-local; can launch anywhere
52       return LocalityLevel.ANY;
53     }
54   }
getAllowedLocalityLevel()

相关文章:

  • 2021-08-20
  • 2021-06-22
  • 2021-09-01
  • 2022-12-23
  • 2021-07-28
  • 2021-11-03
  • 2022-12-23
  • 2021-09-22
猜你喜欢
  • 2021-11-15
  • 2021-06-23
  • 2022-12-23
  • 2021-10-21
  • 2021-06-11
  • 2022-12-23
  • 2021-05-08
相关资源
相似解决方案