前面几篇文章
对DagScheduler的源码进行了学习。本篇开始,将对Spark的Task执行组件Executor进行学习。
1.结构概览
首先我们看一下Executor包中都有哪些类和对象:
不是非常地复杂。
接下来进入正题,看下Executor的内部结构,同样是包含Executor本身和它的伴生对象:
Spark 的Executor是一个执行tasks的线程池。除Mesos的细粒度处理模式外,其他模式下内部使用RPC和Driver进行通信。
看一下里面的数据结构:
主要的数据结构有线程池threadPool。线程kill ,task kill的跟踪数据taskReaperPool,taskReaperForTask,心跳发送和接收的结构heartbeater,heartbeatReceiverRef。 执行的task的跟踪结构runningTasks。
2. 初始化
初始化的时候,创建了一个线程池:
// Start worker thread pool
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadF,actoryBuilder
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
实际就是一个ThreadPoolExecutor,,内部设置了一个ThreadFactory,它是java中用于创建后台线程的工厂类。
下面是其他几个数据结构的初始化:
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
同时executor初始化时,会启动一个任务去像driver反馈心跳:
/**
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}
3. 调度流程
看一下主要的方法
这几个方法都是在CoarseGrainedExecutorBackend的相应方法中调用的。
CoarseGrainedExecutorBackend持有一个Executor对象
看下CoarseGrainedExecutorBackend启动方法:
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
其实就是去Driver上注册了一个Executor。
看下receive 方法中怎么处理的:
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
// 注册成功后在本地创建对应的Executor
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread, reason) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread, reason)
}
case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)
case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
}
跟DagScheduler的doOnReceive方法差不多,通过模式匹配分发任务,处理具体的工作。我们以launchTask为例来看一下,在driver返回的是LaunchTask指令的情况下,就执行LaunchTask操作。在executor不为null 的情况下,解码driver返回的任务信息,调用executor.launchTask方法去启动task。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
上述代码就是executor启动任务的实现。新建一个taskRunner,记录task信息到runningTasks中,然后在线程池中调用线程去执行task。
TaskRunner实际就是一个线程:
class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
extends Runnable
包含了任务的一些信息
至此,executor的任务执行流程也就清楚了,其他的过程大致类似。
executor的心跳反馈:
最后梳理一下:
executor实际是CoarseGrainedExecutorBackend的一个代理窗口。driver端发送的命令都是在CoarseGrainedExecutorBackend里面处理,在receive方法中对driver的command进行分发,完成注册executor,调用executor的接口进行任务的管理等操作。executor负责实际的任务执行和管理工作,管理执行任务的线程池,记录任务的状态。
Executor的源码学习暂时就到这里吧。