TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息、节点资源使用信息、各任务状态等。所以信息被序列化为TaskTrackerStatus实例对象。每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的。这些操作主要位于方法transmitHeartBeat的上半部分代码:

  1 HeartbeatResponse transmitHeartBeat(long now) throws IOException {   
  2  // 计算是否发送任务计数器信息,间隔时间为${COUNTER_UPDATE_INTERVAL}对应的值为60s,不支持配置
  3     boolean sendCounters;
  4     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  5       sendCounters = true;
  6       previousUpdate = now;
  7     }
  8     else {
  9       sendCounters = false;
 10     }
 11 
 12     // 
 13     // Check if the last heartbeat got through... 
 14     // if so then build the heartbeat information for the JobTracker;
 15     // else resend the previous status information.
 16     //
 17     if (status == null) {
 18       synchronized (this) {
 19         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
 20                                        httpPort, 
 21                                        cloneAndResetRunningTaskStatuses(
 22                                          sendCounters), 
 23                                        failures, 
 24                                        maxMapSlots,
 25                                        maxReduceSlots); 
 26       }
 27     } else {
 28       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
 29                "' with reponseId '" + heartbeatResponseId);
 30     }
 31       
 32     //
 33     // Check if we should ask for a new Task
 34     // 计算节点资源使用信息
 35     boolean askForNewTask;
 36     long localMinSpaceStart;
 37     synchronized (this) {
 38       askForNewTask = 
 39         ((status.countOccupiedMapSlots() < maxMapSlots || 
 40           status.countOccupiedReduceSlots() < maxReduceSlots) && 
 41          acceptNewTasks); 
 42       localMinSpaceStart = minSpaceStart;
 43     }
 44     if (askForNewTask) {
 45       askForNewTask = enoughFreeSpace(localMinSpaceStart);
 46       long freeDiskSpace = getFreeSpace();
 47       long totVmem = getTotalVirtualMemoryOnTT();
 48       long totPmem = getTotalPhysicalMemoryOnTT();
 49       long availableVmem = getAvailableVirtualMemoryOnTT();
 50       long availablePmem = getAvailablePhysicalMemoryOnTT();
 51       long cumuCpuTime = getCumulativeCpuTimeOnTT();
 52       long cpuFreq = getCpuFrequencyOnTT();
 53       int numCpu = getNumProcessorsOnTT();
 54       float cpuUsage = getCpuUsageOnTT();
 55 
 56       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
 57       status.getResourceStatus().setTotalVirtualMemory(totVmem);
 58       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
 59       status.getResourceStatus().setMapSlotMemorySizeOnTT(
 60           mapSlotMemorySizeOnTT);
 61       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
 62           reduceSlotSizeMemoryOnTT);
 63       status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
 64       status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
 65       status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
 66       status.getResourceStatus().setCpuFrequency(cpuFreq);
 67       status.getResourceStatus().setNumProcessors(numCpu);
 68       status.getResourceStatus().setCpuUsage(cpuUsage);
 69     }
 70     //add node health information 添加节点健康状态    
 71     TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
 72     synchronized (this) {
 73       if (healthChecker != null) {
 74         healthChecker.setHealthStatus(healthStatus);
 75       } else {
 76         healthStatus.setNodeHealthy(true);
 77         healthStatus.setLastReported(0L);
 78         healthStatus.setHealthReport("");
 79       }
 80     }
 81 
 82 ......
 83 ...//发送心跳报告
 84 .....
 85     synchronized (this) {
 86       for (TaskStatus taskStatus : status.getTaskReports()) {
 87         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
 88             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
 89             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
 90             !taskStatus.inTaskCleanupPhase()) {
 91           if (taskStatus.getIsMap()) {
 92             mapTotal--;
 93           } else {
 94             reduceTotal--;
 95           }
 96           myInstrumentation.completeTask(taskStatus.getTaskID());
 97           runningTasks.remove(taskStatus.getTaskID());
 98         }
 99       }
100 
101 .....
102 // 其他代码
103 }
transmitHeartBeat

相关文章: