【发布时间】:2019-05-10 03:06:43
【问题描述】:
在探索 Spark 累加器时,我试图了解和展示 Spark 中累加器和常规变量之间的区别。但输出似乎与我的预期不符。我的意思是累加器和计数器在程序结束时都具有相同的值,并且能够在转换函数中读取累加器(根据文档,只有驱动程序可以读取累加器)。难道我做错了什么?我的理解正确吗?
object Accmulators extends App {
val spark = SparkSession.builder().appName("Accmulators").master("local[3]").getOrCreate()
val cntAccum = spark.sparkContext.longAccumulator("counterAccum")
val file = spark.sparkContext.textFile("resources/InputWithBlank")
var counter = 0
def countBlank(line:String):Array[String]={
val trimmed = line.trim
if(trimmed == "") {
cntAccum.add(1)
cntAccum.value //reading accumulator
counter += 1
}
return line.split(" ")
}
file.flatMap(line => countBlank(line)).collect()
println(cntAccum.value)
println(counter)
}
输入文件的文本中间有 9 个空行
4) Aggregations and Joins
5) Spark SQL
6) Spark App tuning
输出:
counter 和 cntAccum 给出相同的结果。
【问题讨论】: