简介
ExecutorAllocationManager在SparkContext中创建,基于工作负载通过后台定时线程来动态分配、移除Executor资源及取消多余的executor申请。ExecutorAllocationManager保存一个动态目标数量的executor信息,并定时周期同步给cluster manager。开始时从配置信息中得到目标executor数量的初始值,然后基于等待及运行的task的数量进行改变。
当目前的目标数量超过了处理目前工作负载的需求时则减少目标数量。目标executor的数量总是被截断到可同时运行当前所有运行中及等待的task总和的exeuctor数量上。作为对等待调度的积压的task的响应,ExecutorAllocationManager会增加目标executor的数量。移除策略很简单:如果一个executor已经空闲了K秒(表示它没有被调度来运行任何task),则它会被移除。
如果调用队列在
N秒内还没有被消耗完,则新的executor会被添加。如果调用队列再有M秒没有被消耗完,则更多的exeuctor会被添加,以此循环往复。每次循环中添加的executor的数量呈指数级增长,直到增长到最大值为止(上界值是同时基于配置值和现在运行中及等待中的task数量决定的)。指数级增长的系数是两方面的:1)在开始时executor应该缓慢地增长,以免额外需要的executor数量可能是很小的。否则,我们可能添加了超过我们需要的executor,并随后需要移除它们。2) 随着时间积累,executor应该快速添加,以免最大需求executor数量很大。否则,在高工作负载下需要花费很长时间来满足。
增加和移除executor的两种情形下都没有重试逻辑,因为我们假设cluster manager会最终满足它异步收到地所有请求。
ExecutorAllocationManager创建和启动
-
SparkContext中创建ExecutorAllocationManager,基于工作负载动态分配executors的数量。只有在本地模式或者测试环境下时该功能才有效,此时可通过参数
spark.dynamicAllocation.enabled控制,默认为false。// 是否开启动态分配executor,通过参数spark.dynamicAllocation.enabled控制,默认为false // 只有非本地模式或者测试环境下时该参数才有效,并且当schedulerBackend不是ExecutorAllocationClient的子类时也无效 // 该功能对测试也开放 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) case _ => None } } else { None } // 启动ExecutorAllocationManager _executorAllocationManager.foreach(_.start() ... // 将executorAllocationManagerSource注册到driver实例的metricsSystem中 _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } -
ExecutorAllocationManager创建好后,调用其
start()方法启动。
executorAllocationManager.start()方法将ExecutorAllocationListener加入listenerBus中,来监听listenerBus里添加及删除Executor的事件。然后启动一个后台定时线程,添加Executor,并遍历Executor将超时的Executor杀掉并移除。
// 用于监听影响分配策略的spark事件
private val listener = new ExecutorAllocationListener
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
// 注册回调方法来决定何时增加及移除executor,并且启动定时周期性调度任务
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
// 该方法被定时周期调用来调节空闲等待请求的executor数量及运行中executor的数量。
// 首先,基于添加时间和我们目前的需求来调整我们请求的executor数。
// 然后,如果一个存在的executor的移除时间已经过期,则杀死这个exeuctor。
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
// 更新我们的目标exeuctor数量,并和cluster manager同步这个结果。
// 检查我们已分配和已请求的executor数量是否超出了我们目前的需求。
// 如果已经超出,则清空我们的目标并让cluster manager知道,以让它可以取消不需要的处于等待中的executor请求。
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
// retain方法的含义是只返回函数值为true的结果,即将超时的值从removeTimes中踢除
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
}
// 移除超时的executor
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
相关配置参数
相关的spark配置参数如下:
- spark.dynamicAllocation.enabled:表示动态分配executor功能是否被开启。
- spark.dynamicAllocation.minExecutors:分配executor数量的下界值。
- spark.dynamicAllocation.maxExecutors:分配executor数量的上界值。
- spark.dynamicAllocation.schedulerBacklogTimeout (M):task积压的超时时间,如果超过则添加新的executor,并随着时间积累,exeuctor增长数会指数级增长。
- spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N):task积压的超时时间,如果超过则添加新的executor。该参数仅用于初始积压时间超时时。
- spark.dynamicAllocation.executorIdleTimeout (K):executor空闲超时时间,如果超过则移除它。