【问题标题】:How to accumulate results in a Seq for later processing in spark?如何在 Seq 中累积结果以供以后在 spark 中处理?
【发布时间】:2019-04-12 16:50:25
【问题描述】:

我正在尝试处理 Spark 数据帧中的每一行,并将其转换为另一个数据帧。本质上,我有一个框架 A,其中包含一列(“id”)和另一列是句子数组。我想将其转换为另一个数据帧,每个句子都用“docID:count”标识符字符串唯一标识。我的代码是:

var sentencesCollection:Seq[SentenceIdentifier] = Seq()
tokenized.foreach(row => {
    val docID = row.getAs[String]("id")
    val sentences = row.getAs[Seq[String]]("sentences")
    var count:Integer = 0
    for (elem <- sentences) {
        val sentenceID:String = docID + ":" + count
        count = count + 1

        val si = SentenceIdentifier(sentenceID, elem)
        sentencesCollection = sentencesCollection :+ si
     }

})

println(sentencesCollection.length)

但是,println 语句打印“0”。

知道我怎样才能让 sentenceCollection 成为我可以在下游进一步处理的序列吗? (可能认为是 .toDF() 调用)。

【问题讨论】:

  • 那行不通,因为foreach 是由 Executors (可能在另一个 jvm 的另一台机器上) 执行的,而你的 Seq只存在于 Driver 中。这已经被问过无数次了。如果您想拥有数据的本地副本,请使用collect。 - 但是,由于您要做的是创建一个新的DataFrame,因此创建本地集合会浪费内存和性能瓶颈。有很多方法可以从另一个DataFrame 创建一个新的DataFrame,例如选择、过滤或映射(在Dataset。我认为您应该阅读更多关于 Spark 的工作原理

标签: scala apache-spark dataframe


【解决方案1】:

正如@Luis Miguel Mejía Suárez 在评论中很好解释的那样,作为DataFrame.foreach 的参数传递的任何函数都将在一台或多台执行器机器上执行,而不是在运行此代码的驱动程序上执行,因此对可变状态的任何更改都将是丢失(它将在 executors 上执行并丢弃)。

使用 DataFrames 时,您应该始终考虑将一个 DF 转换为另一个 DF,只使用 Spark 的 API 即可。这些转换是 Spark 的“指令”,由其分布式执行。

在这种情况下,考虑到这一点就可以实现您的要求。你想:

  • 分解记录,这意味着将包含数组的每条记录转换为多条记录,每条记录都包含数组中的一个元素
  • 跟踪爆炸元素在数组中的位置
  • Concat 以“:”作为分隔符的“id”列的现有值的位置

这些操作中的每一项都可以通过 Spark 的一个函数来实现,该函数旨在在 DataFrame 列上执行。解决方案如下所示:

import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data
val tokenized = Seq(
  (1, Array("Hi there", "Hello there")),
  (2, Array("Bye now")),
  (3, Array("Thank you", "Thanks", "Many thanks"))
).toDF("id", "sentences")

val result = tokenized
   // we'll use posexplode function which creates "pos" and "col" columns
  .select($"id", posexplode($"sentences")) 
   // we'll create a new docID column using concat function, and rename "col"
  .select(concat($"id", lit(":"), $"pos") as "docID", $"col" as "sentence")

result.show()
// +-----+-----------+
// |docID|   sentence|
// +-----+-----------+
// |  1:0|   Hi there|
// |  1:1|Hello there|
// |  2:0|    Bye now|
// |  3:0|  Thank you|
// |  3:1|     Thanks|
// |  3:2|Many thanks|
// +-----+-----------+

【讨论】:

  • 非常感谢!这很好用。我肯定也会更多地了解 Spark 的基础知识。
猜你喜欢
  • 1970-01-01
  • 2019-03-31
  • 1970-01-01
  • 2012-11-08
  • 2013-07-18
  • 1970-01-01
  • 2019-01-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多