【问题标题】:Spark : Difference between accumulator and local variableSpark:累加器和局部变量之间的区别
【发布时间】:2019-05-10 03:06:43
【问题描述】:

在探索 Spark 累加器时,我试图了解和展示 Spark 中累加器和常规变量之间的区别。但输出似乎与我的预期不符。我的意思是累加器和计数器在程序结束时都具有相同的值,并且能够在转换函数中读取累加器(根据文档,只有驱动程序可以读取累加器)。难道我做错了什么?我的理解正确吗?

object Accmulators extends App {

  val spark = SparkSession.builder().appName("Accmulators").master("local[3]").getOrCreate()

  val cntAccum = spark.sparkContext.longAccumulator("counterAccum")

  val file = spark.sparkContext.textFile("resources/InputWithBlank")

  var counter = 0

  def countBlank(line:String):Array[String]={
    val trimmed = line.trim
    if(trimmed == "") {
      cntAccum.add(1)
      cntAccum.value //reading accumulator
      counter += 1
    }
    return line.split(" ")
  }

  file.flatMap(line => countBlank(line)).collect()

  println(cntAccum.value)

  println(counter)
}

输入文件的文本中间有 9 个空行

4) Aggregations and Joins

5) Spark SQL

6) Spark App tuning

输出:

counter 和 cntAccum 给出相同的结果。

【问题讨论】:

    标签: apache-spark accumulator


    【解决方案1】:

    counter 是局部变量,可能正在您的本地程序.master("local[3]") 中工作,它将在驱动程序上执行。假设您正在运行yarn 模式。那么所有逻辑都将以分布式方式工作,您的局部变量不会被更新(因为它的本地变量会被更新)但累加器会被更新。因为它的分布变量。假设您有 2 个执行程序运行程序...一个执行程序将更新,另一个执行程序可以看到最新值。 在这种情况下,您的cntAccum 能够以纱线分布式模式从其他执行者那里获取最新值。其中作为局部变量counter 不能...

    自从accumulators are read and write. see docs here.

    图像中的执行者 id 是 localhost。如果您使用带有 2-3 个执行器的纱线,它将显示执行器 ID。希望对您有所帮助..

    【讨论】:

    • Gurupraveen :有用吗?如果是,请注意,请接受答案并投票
    猜你喜欢
    • 2022-06-16
    • 2015-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-27
    • 2012-10-06
    • 2011-05-24
    • 2017-12-14
    相关资源
    最近更新 更多