【问题标题】:Too many Accumulators in Spark JobSpark 作业中的累加器太多
【发布时间】:2016-06-27 11:25:48
【问题描述】:

我的 spark 应用有 40 个累加器

object MySparkApp { 
  def main(args: Array[String]): Unit = {
    // initialize SparkContext

    val acc1 = sc.accumulator(0)
    val acc2 = sc.accumulator(0)
    .
    .
    val acc40 = sc.accumulator(0)

    val logRdd = sc.textFile("input/path").map(x => parser.parse(x))
    logRdd.forEach(x => incrementCounter(x, acc1, acc2,..... acc40))
  }
}

这段代码非常丑陋,如果将这些累加器包装在对象之类的东西中并让代码更具可读性,这将是一种好方法。

【问题讨论】:

  • 使用List 的累加器?
  • 你的用途是什么?这似乎需要进一步分解?这可能只是一个自定义累加器吗?
  • 这些是 x 内的空值或无效值的计数,用于在作业结束时记录。

标签: scala apache-spark


【解决方案1】:

一种选择是为Map[String, Long] 类型实现一个累加器 - 然后为每次出现数据中的错误值:

累加器参数的实现:

class StringToLongAccumulatorParam extends AccumulatorParam[Map[String, Long]] {
  override def addInPlace(r1: Map[String, Long], r2: Map[String, Long]): Map[String, Long] = {
    // merging the maps:
    r1 ++ r2.map{ case (k,v) => k -> (v + r1.getOrElse(k,0L)) }
  }

  override def zero(initialValue: Map[String, Long]): Map[String, Long] = Map[String, Long]()
}

然后您可以通过使用此参数的实例创建 implicit val 然后创建和使用适当的累加器来使用它:

implicit val accParam = new StringToLongAccumulatorParam()
val accumulator = sc.accumulator[Map[String, Long]](Map[String, Long]())
val rdd2 = rdd.map(v => { accumulator += Map("FieldName" -> 1); v })

当然 - 将 "FieldName" 更改为您需要的任何内容。对于每条记录,您可以创建一个包含任意数量条目的映射,然后使用+= 将其添加到累加器中。

注意:如果您有很多这些错误值,我不确定这是否会表现得如此出色 - 但如果您的大多数记录最终不会创建这些地图,那么应该是微不足道。如果大多数记录确实有 null/bad 值,也许这应该通过累加器完成,而是通过实际的 RDD 操作(映射到值错误的 1s 并减少?)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-18
    • 2016-09-30
    • 1970-01-01
    • 1970-01-01
    • 2017-04-24
    • 2019-08-07
    • 1970-01-01
    相关资源
    最近更新 更多