![]()

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类
/**
*完成Master的主备切换,从字面意思来看,其实就是完成Master的恢复
/
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
//将Application和Worker,过滤出来目前状态还是UNKNOWN的
//然后遍历,分别调用removeWorkers和finishApplication方法,对可能已经出现故障,或者甚至已经死掉的Application和Worker,进行清理
// 清理的机制:1.从内存缓存结构中移除;2.从相关的组件的内存缓存中移除;3.从持久化存储中移除
// Kill off any workers and apps that didn\'t respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)//清理worker的信息
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)//清理Application信息
// 如果Driver为空的话,重新尝试启动Driver
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
schedule() //资源组件发送改变时(Application、Worker)执行重新调度
logInfo("Recovery complete - resuming operations!")
}