Executor启动完了,接下来就准备在executor上执行task了,关于task任务的执行,就是我们接下来要说的TaskScheduler和DAGScheduler了。
TaskScheduler作用是为创建它的SparkContext调度任务,即从DAGScheduler接受不同Stage的任务,并且向集群提交这些任务
DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据,而且DAGScheduler在不同的资源管理框架下的实现是相同的,DAGScheduler将这组Task划分完成后,会将这组Task提交到TaskScheduler,TaskScheduler通过Cluster Manager在集群的某个Worker的Executor上启动任务
在SparkPi程序中,使用RDD的action算子,会触发Spark任务,即在SparkPi程序中的reduce算子,触发了任务的执行
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
看一下reduce算子,查看触发DAGScheduler的流程
在reduce中发现,其实调用了sparkContext的runJob方法
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
SparkContext的runJob方法,其中调用了多层runJob方法
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
第一层runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
第二层runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
第三层runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
进入DAGScheduler.scala
runJob方法中调用了submitJob方法,用于提交job,该方法返回一个JobWaiter,用于等待DAGScheduler任务完成
/**
* 在给定的RDD上执行一个action job并将所有结果传递给resultHandler函数
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
submitJob方法,调用eventProcessLoop post将JobSubmitted事件添加到DAGScheduler事件队列中
这个eventProcessLoop是DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop是来对DAGScheduler主要事件进行管理
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this)
提交的JobSubmitted事件,实际上调用了DAGScheduler的handleJobSubmitted方法
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
到这里,我们找到了DAGScheduler调度的核心入口handleJobSubmitted
一共做了四个操作:
1.使用触发job的最后一个rdd,创建finalStage
这说一下stage,stage有两种,一种是ShuffleMapStage,另一种是ResultStage
ShuffleMapStage用来处理Shuffle
ResultStage是最后执行的Stage,用于任务的收尾工作
ResultStage之前的stage都是ShuffleMapStage
2.用finalStage创建一个Job,这个job的最后一个stage,就是finalStage
3.将Job相关信息,加入内存缓冲中
4.第四步,使用submitStage方法提交finalStage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 第一步:
//使用触发job的最后一个RDD,创建finalStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
//第二步:用finalStage创建一个Job
//这个job的最后一个stage,就是finalStage
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
//第三步,将Job相关信息,加入内存缓冲中
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//第四步,使用submitStage方法提交finalStage
//这个方法的调用,其实会提交第一个stage,并将其他的stage放到waitingStages队列中
submitStage(finalStage)
}
submitStage方法用于提交stage
1.对stage的active job id进行验证,如果存在,进行第2步,如果不存在,则abortStage终止
2.在提交这个stage之前,先要判断当前stage不是等待中/运行中/失败的stage,如果都不是,进行第3步,如果是,则abortStage终止
3.调用getMissingParentStages方法,获取当前stage的所有未提交的父stage
如果不存在stage未提交的父stage,调用submitMissingTasks方法,提交当前stage所有未提交的task
如果存在未提交的父Stage,递归调用submitStage提交所有未提交的父stage(即如果这个stage一直有未提交的父stage,一直进行调用,直到最开始的stage 0),并将当前stage加入waitingStages等待执行队列中
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//调用getMissingParentStages获取当前这个stage的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
/*
这里会反复递归调用,直到最开始的stage,它没有父stage,此时就会提交这个最开始的stage,stage0
其余的stage都在waitingStages中
*/
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
//如果有父stage
//递归调用submitStage()方法,去提交父stage
//这里的递归就是stage划分算法的推动和亮点!
for (parent <- missing) {
submitStage(parent)
}
//并且将当前的stage添加到waitingStages等待执行的队列中
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
其中,调用的getMissingParentStages方法,就涉及到了stage的划分算法
在划分算法中做了如下操作:
1.创建了一个存放rdd的栈
2.往栈中推入了stage的最后一个rdd
3.如果栈不为空,对这个rdd调用内部定义的visit方法
在visit方法中做了如下操作:
1.遍历传入rdd的依赖
2.如果是宽依赖,调用getOrCreateShuffleMapStage方法,创建一个新的stage
如果是窄依赖,将rdd放入栈中
3.将新的stage list返回
/*
获取某个stage的父stage
这个方法,对于一个stage,如果它的最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage
但只要发现这个stage的rdd宽依赖了某个rdd,那么就用宽依赖的那个rdd,创建一个新的stage
然后立即将新的stage返回
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
//遍历RDD的依赖
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
//如果是宽依赖
case shufDep: ShuffleDependency[_, _, _] =>
// 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage
// 默认最后一个stage,不是shuffleMap stage
// 但是finalStage之前所有的stage,都是shuffleMap Stage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖,将rdd放入栈中
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
//首先往栈中,推入了stage最后的一个rdd finalStage
waitingForVisit.push(stage.rdd)
//进行while循环
while (waitingForVisit.nonEmpty) {
// 对stage的最后一个rdd,调用自己内部定义的visit()方法
visit(waitingForVisit.pop())
}
missing.toList
}
顺便看一下getOrCreateShuffleMapStage方法,用于创建stage
方法中调用createShuffleMapStage方法,为给定的宽依赖创建ShuffleMapStage
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
...
// 为给定的shuffle依赖创建一个stage
createShuffleMapStage(shuffleDep, firstJobId)
}
}
createShuffleMapStage会创建一个新的ShuffleMapStage
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
stage的创建和划分就完成了。
在Spark Web UI上看到当前的stage如下:
查看Active Stage为1
点击Description,查看stage描述
查看Spark Pi程序生成的DAG图如下:
因为程序中不含有shuffle操作,即没有宽依赖,所以只有一个stage,为finalStage。
DAGScheduler的stage的划分算法很重要,想要很好的掌握spark,要对stage划分算法很清晰,知道自己编写的spark application被划分了几个job,每个job被划分成了几个stage,每个stage包含了哪些代码,定位到代码。
当我们遇到异常时,发现stage运行过慢,或者stage报错,我们能针对那个stage对应的代码去排查问题,或者性能调优。