【发布时间】:2018-07-24 13:45:24
【问题描述】:
我想计算 Spark 上的进程速率。使用 Spark Listener,可以获取运行阶段 id。但是我怎样才能获得应用程序包含的阶段总数。我应该手动制作吗?谢谢。
【问题讨论】:
标签: apache-spark listener stage
我想计算 Spark 上的进程速率。使用 Spark Listener,可以获取运行阶段 id。但是我怎样才能获得应用程序包含的阶段总数。我应该手动制作吗?谢谢。
【问题讨论】:
标签: apache-spark listener stage
您可以通过计算从org.apache.spark.scheduler.SparkListenerJobStart 事件侦听器类中获得的StageInfos 数量来获得它。
您必须为onJobStart 事件注册事件侦听器(在此doc 中找到)。
我发现每次完成任务时计算任务总数并增加完成的任务总数会更有用:
package com.xxx.xxx
import java.util.concurrent.TimeUnit
import com.codahale.metrics.{Counter, MetricRegistry, ScheduledReporter}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd}
class BatchProgressListener(conf: MonitorConfig, topic: String) extends SparkListener {
// Irrelevant code omitted
...
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
tasksCompleteCounter.inc()
}
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
val sumOfTasks = jobStart.stageInfos.foldLeft(0L) { (acc, i) => acc + i.numTasks }
totalTasksCounter.inc(sumOfTasks)
}
}
【讨论】: