Spark application 支持自定义listener,用户可以实时获取任务状态给自己的监控系统,可以获取以下几个状态:
trait SparkListener {
/**
* 当一个state执行成功或者失败的时候调用,包含了已完成stage的信息
* Called when a stage completes successfully or fails, with information on the completed stage.
*/
def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
/**
* 当一个state提交的时候的时候调用
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
/**
* 当一个task任务开始时候调用
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart) { }
/**
* 当一个task执行成功或者失败的时候调用,包含了已完成task的信息
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
/**
* 当一个task结束开始时候调用
* Called when a task ends
*/
def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
/**
* 当一个job启动开始调用
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* 当一个job执行成功或者失败的时候调用,包含了已完成job的信息
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
/**
* 当一个环境变量改变的时候开始调用
* Called when environment properties have been updated
*/
def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
/**
* Called when a new block manager has joined
*/
def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
/**
* Called when an existing block manager has been removed
*/
def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
/**
* Called when an RDD is manually unpersisted by the application
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
/**
* Called when the application starts
*/
def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
/**
* Called when the application ends
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
/**
* Called when the driver registers a new executor.
*/
def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
/**
* Called when the driver removes an executor.
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
/**
*
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
}
只需要extends SparkListener,然后注册到sparkContext 既可以实现自定义listener,代码逻辑如下:
package com.suning.spark
import org.apache.spark.scheduler.MySparkListener
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Ricky on 2016/4/14 0014.
*/
object JobProcesser {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("KafkaWordCountProducer").setMaster("local")
val sc = new SparkContext(sparkConf)
/* sc.setJobGroup("test1","testdesc")
val completedJobs= sc.jobProgressListener*/
sc.addSparkListener(new MySparkListener)
val rdd1 = sc.parallelize(List(('a', 'c', 1), ('b', 'a', 1), ('b', 'd', 8)))
val rdd2 = sc.parallelize(List(('a', 'c', 2), ('b', 'c', 5), ('b', 'd', 6)))
val rdd3 = rdd1.union(rdd2).map {
x => {
Thread.sleep(500)
x
}
}.count()
rdd1.map(x => 0.2).map(x => 0).map {
x => {
if (x == 0) {
throw new Exception("my exeception")
}
}
x
}.reduce(_ + _)
println(rdd3)
sc.stop()
}
}
package org.apache.spark.scheduler
/**
* Created by Ricky on 2016/4/14 0014.
*/
class MySparkListener extends SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
println("*************************************************")
println("app:end")
println("*************************************************")
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
println("*************************************************")
println("job:end")
jobEnd.jobResult match {
case JobSucceeded =>
println("job:end:JobSucceeded")
case JobFailed(exception) =>
println("job:end:file")
exception.printStackTrace()
}
println("*************************************************")
}
}
事件总线运行于driver端,事件来源如上图。