【问题标题】:Can we use outer map object in spark.map function我们可以在 spark.map 函数中使用外部地图对象吗
【发布时间】:2015-10-12 18:21:17
【问题描述】:

我是 Scala 和函数式编程的新手。我有以下火花代码 sn-p:

case class SPR(symbol:String, splitOrg:Double, splitAdj:Double, timeStamp: String, unx_tt: Int)

var oldFct = 15.0
val splitMap = collection.mutable.Map[String, Double]()

val tmp = splitsData.map{ row=>
    var newFct = 1.0;
    var sym = row(0).toString;
    oldFct = splitMap.getOrElse(sym, 1.0)
    newFct = row(12).toString.toDouble * oldFct
    splitMap += (sym->newFct)
    SPR(row(0).toString, row(12).toString.toDouble, newFct, row(10).toString, row(13).toString.toInt)
}.collect()

println("MAP ===========" + splitMap.size)

根据我的观察,我可以在块内使用原始数据类型,但在 Map 对象的情况下,我总是将大小设为 0。所以似乎没有添加键值对。

提前致谢。

【问题讨论】:

  • 请提供一个可重现的最小示例,以便我们提供帮助!这和spark有什么关系??

标签: scala mapreduce apache-spark scala-collections


【解决方案1】:

阅读 Spark 文档中的 Understanding closures。最相关的部分(只需将counter 替换为您的splitMap):

在其范围之外修改变量的 RDD 操作可能是一个常见的混淆源...

主要挑战是上述代码的行为是未定义的。在具有单个 JVM 的本地模式下,上述代码将对 RDD 中的值求和并将其存储在计数器中。这是因为 RDD 和变量 counter 在驱动节点上的内存空间相同。

但是,在集群模式下,发生的事情会更加复杂,上述操作可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为任务——每个任务都由一个执行器操作。在执行之前,Spark 会计算闭包。闭包是那些必须对执行程序可见的变量和方法,以便在 RDD 上执行其计算(在本例中为 foreach())。这个闭包被序列化并发送到每个执行器。在本地模式下,只有一个执行者,所以一切都共享同一个闭包。然而,在其他模式下,情况并非如此,并且在单独的工作节点上运行的执行程序每个都有自己的闭包副本。

这里发生的情况是,发送给每个执行器的闭包中的变量现在是副本,因此,当在 foreach 函数中引用计数器时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍有一个计数器,但执行程序不再可见!执行者只能看到来自序列化闭包的副本。因此,counter 的最终值仍然为零,因为对 counter 的所有操作都引用了序列化闭包中的值。

为了确保在这些场景中明确定义的行为,应该使用累加器。 Spark 中的累加器专门用于提供一种机制,用于在集群中的工作节点之间拆分执行时安全地更新变量。本指南的累加器部分更详细地讨论了这些。

一般来说,闭包——诸如循环或局部定义的方法之类的结构,不应该用于改变一些全局状态。 Spark 不定义或保证从闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下工作,但这只是偶然,这样的代码在分布式模式下不会像预期的那样运行。如果需要一些全局聚合,请改用 Accumulator。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多