TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息、节点资源使用信息、各任务状态等。所以信息被序列化为TaskTrackerStatus实例对象。每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的。这些操作主要位于方法transmitHeartBeat的上半部分代码:
1 HeartbeatResponse transmitHeartBeat(long now) throws IOException { 2 // 计算是否发送任务计数器信息,间隔时间为${COUNTER_UPDATE_INTERVAL}对应的值为60s,不支持配置 3 boolean sendCounters; 4 if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { 5 sendCounters = true; 6 previousUpdate = now; 7 } 8 else { 9 sendCounters = false; 10 } 11 12 // 13 // Check if the last heartbeat got through... 14 // if so then build the heartbeat information for the JobTracker; 15 // else resend the previous status information. 16 // 17 if (status == null) { 18 synchronized (this) { 19 status = new TaskTrackerStatus(taskTrackerName, localHostname, 20 httpPort, 21 cloneAndResetRunningTaskStatuses( 22 sendCounters), 23 failures, 24 maxMapSlots, 25 maxReduceSlots); 26 } 27 } else { 28 LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + 29 "' with reponseId '" + heartbeatResponseId); 30 } 31 32 // 33 // Check if we should ask for a new Task 34 // 计算节点资源使用信息 35 boolean askForNewTask; 36 long localMinSpaceStart; 37 synchronized (this) { 38 askForNewTask = 39 ((status.countOccupiedMapSlots() < maxMapSlots || 40 status.countOccupiedReduceSlots() < maxReduceSlots) && 41 acceptNewTasks); 42 localMinSpaceStart = minSpaceStart; 43 } 44 if (askForNewTask) { 45 askForNewTask = enoughFreeSpace(localMinSpaceStart); 46 long freeDiskSpace = getFreeSpace(); 47 long totVmem = getTotalVirtualMemoryOnTT(); 48 long totPmem = getTotalPhysicalMemoryOnTT(); 49 long availableVmem = getAvailableVirtualMemoryOnTT(); 50 long availablePmem = getAvailablePhysicalMemoryOnTT(); 51 long cumuCpuTime = getCumulativeCpuTimeOnTT(); 52 long cpuFreq = getCpuFrequencyOnTT(); 53 int numCpu = getNumProcessorsOnTT(); 54 float cpuUsage = getCpuUsageOnTT(); 55 56 status.getResourceStatus().setAvailableSpace(freeDiskSpace); 57 status.getResourceStatus().setTotalVirtualMemory(totVmem); 58 status.getResourceStatus().setTotalPhysicalMemory(totPmem); 59 status.getResourceStatus().setMapSlotMemorySizeOnTT( 60 mapSlotMemorySizeOnTT); 61 status.getResourceStatus().setReduceSlotMemorySizeOnTT( 62 reduceSlotSizeMemoryOnTT); 63 status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 64 status.getResourceStatus().setAvailablePhysicalMemory(availablePmem); 65 status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime); 66 status.getResourceStatus().setCpuFrequency(cpuFreq); 67 status.getResourceStatus().setNumProcessors(numCpu); 68 status.getResourceStatus().setCpuUsage(cpuUsage); 69 } 70 //add node health information 添加节点健康状态 71 TaskTrackerHealthStatus healthStatus = status.getHealthStatus(); 72 synchronized (this) { 73 if (healthChecker != null) { 74 healthChecker.setHealthStatus(healthStatus); 75 } else { 76 healthStatus.setNodeHealthy(true); 77 healthStatus.setLastReported(0L); 78 healthStatus.setHealthReport(""); 79 } 80 } 81 82 ...... 83 ...//发送心跳报告 84 ..... 85 synchronized (this) { 86 for (TaskStatus taskStatus : status.getTaskReports()) { 87 if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 88 taskStatus.getRunState() != TaskStatus.State.UNASSIGNED && 89 taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING && 90 !taskStatus.inTaskCleanupPhase()) { 91 if (taskStatus.getIsMap()) { 92 mapTotal--; 93 } else { 94 reduceTotal--; 95 } 96 myInstrumentation.completeTask(taskStatus.getTaskID()); 97 runningTasks.remove(taskStatus.getTaskID()); 98 } 99 } 100 101 ..... 102 // 其他代码 103 }