上文中分析了DAGScheduler的基本功能Dagscheduler-1,本文主要深入分析一下Job提交和相关处理。
首先看一下相关的方法和数据结构:
这些结构大多都是HashMap。
1. Job的提交过程
首先是在SparkContext.submitJob提交job,调用的是dagScheduler.submitJob方法
/**
* Submit a job for execution and return a FutureJob holding the result.
*
* @param rdd target RDD to run tasks on
* @param processPartition a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
* @param resultFunc function to be executed when the result is ready
*/
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] = {
assertNotStopped()
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
}
看下dagScheduler.submitJob方法:功能是提交一个job到scheduler
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
// 检查分区正确性
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// 获取jobID
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 创建一个waiter等待job提交完成
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
关于返回的JobWaiter,可以看一下源码:
它是一个等待Job提交完成的对象。当任务提交结束后,会返回结果给特定的处理函数,也就是作为参数传进来的那个resulthandler函数。
2. Job执行
//DagScheduler
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//首先查看一下Job提交的状态,如果没有提交完成,
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
与Job的提交过程类似,runJob也是在SparkContext中调用的。
这里,runJob有两种方式
两者的区别在于runjob是异步非阻塞方式,runApproximateJob是阻塞方式,会等待job执行完成。回到sparkContext中
runApproximateJob是开发者API。实际job执行的时候,主要还是调用runJob来执行任务。
到这里DagScheduler的job提交过程基本就清楚了。
2. Job 管理
接下来我们看一下job的其他管理操作。主要是job取消相关的操作
以上三个方法里面主要就是以不同方式取消job,本质是发送一个event到队列里面。然后等待事件处理进程去处理。
以上就是job提交和管理的内容了。下一篇我们将继续探索DAGScheduler对stage的提交和管理。