【发布时间】:2019-08-02 07:23:30
【问题描述】:
我正在尝试在mutable scala list 中添加元素,如下所示。我正在逐行读取数据框中的值,提取名称为“_title”的列的值并将其添加到列表中。但是当 for 循环完成时,列表仍然是空的。这是代码:
import scala.collection.mutable.ListBuffer
val flatK = dfR.withColumn("UserValue", explode(col("UserValue")))
var colListA = new ListBuffer[String]()
// var colSet : List[String] = List()
for(i <- 0 until Integer.parseInt(dfR.count().toString)){
flatK.filter($"columnIndex" === i).foreach{
r=>
val columnName = r.getAs[Row]("UserValue").getAs[String]("_title")
// println(columnName)
colListA.append(columnName)
}
}
println(columnName) 实际上打印了我想放入列表中的值。
我的数据框dfR 如下所示:
+--------------------------------------------------------------+-----------+
|UserValue |columnIndex|
+--------------------------------------------------------------+-----------+
|[, last_mod_date, 2009-01-14T13:40:53] |0 |
|[, object_string, SOLIDS] |0 |
|[, last_mod_date, 2009-01-13T22:58:30] |1 |
|[, object_string, TORSO] |1 |
当我这样做时
colListA += "elements"
colListA += "adds"
我可以看到添加的元素。但不在foreach 循环内。谁能告诉我我应该尝试什么?基本上,我希望 colList 填充 last_mod_date 和 object_string。
【问题讨论】:
-
这个问题以前被问过很多次了,但我找不到人们经常使用的优秀副本。无论如何,简短的回答:
foreach不是在 驱动程序 (您的缓冲区存在的地方) 中执行,而是在 executors (他们都有一个本地的缓冲区副本)。最后,每个副本都被修改了,但结果没有与驱动程序同步,因此主缓冲区保持为空。这个常见的新手错误是由于对 Sparks 架构的了解不足而造成的。我建议您阅读一些有关 spark 工作原理以及它们的用例的内容。 -
酷。但是我怎样才能在这里达到我想要的目的呢?如果它是重复的,您可以将我引导到该链接吗?
-
正如我所说的,我找不到确切的重复答案,但你可以很容易地找到很多类似的问题。现在,您如何做到这一点可能取决于您的实际用例。您可以简单地调用
collect以便在您的驱动程序中包含所有值(看起来像您想要的) - 请注意,如果DF很大,您可能只是记忆犹新. Spark 旨在处理一台机器无法容纳的大量数据,但由于您已经在本地模式下工作,因此仅用于调试就完成了。 -
好吧,我的数据框实际上可能非常大。因此,如果我确实收集,它也可能会耗尽内存。让我试着想出别的办法。
-
您正在尝试将列表中的每个字符串添加到 mutuable.ListBuffer。请纠正我?
标签: scala apache-spark foreach scala-collections