【问题标题】:RDD Remove elements by keyRDD 按键删除元素
【发布时间】:2016-04-27 14:32:18
【问题描述】:

我有 2 个使用以下代码提取的 RDD:

val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")

然后我通过键映射和减少它:

val countsB = fileB.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

val countsA = fileA.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

如果 countA 中存在键,我现在不想查找并删除 countB 中的所有键

我尝试过类似的方法:

countsB.keys.foreach(b => {
  if(countsB.collect().exists(_ == b)){
    countsB.collect().drop(countsB.collect().indexOf(b))
  }
})

但它似乎并没有通过密钥删除它们。

【问题讨论】:

标签: scala apache-spark rdd


【解决方案1】:

您建议的代码存在 3 个问题:

  1. 您正在collecting RDD,这意味着它们不再是 RDD,它们作为普通 Scala 集合被复制到驱动程序应用程序的内存中,因此您会失去 Spark 的并行性并冒着 OutOfMemory 错误的风险,以防您的数据集大

  2. 在不可变的 Scala 集合(或 RDD)上调用 drop 时,您不会更改原始集合,您会得到一个 new 集合,其中删除了这些记录,所以你不能指望原始收藏会改变

  3. 您不能在传递给任何 RDD 高阶方法的函数中访问 RDD(例如,在这种情况下为 foreach) - 传递给这些方法的任何函数都被序列化并发送给工作人员,并且RDDs (故意)不可序列化 - 将它们提取到驱动程序内存中、序列化它们并发送回工作人员是没有意义的 - 数据已经分布在工作人员身上!

要解决所有这些问题 - 当您想使用一个 RDD 的数据来转换/过滤另一个 RDD 时,您通常需要使用某种类型的 join。在这种情况下,您可以这样做:

// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }

请注意,我在这里使用的 collect 不是您使用的 collect - 这个使用 PartialFunction 作为参数,其行为类似于 mapfilter 的组合,最重要的是:它不会将所有数据复制到驱动程序内存中。

编辑:正如原型 Paul 评论的那样 - 你有一个更短更好的选择 - subtractByKey

countsB.subtractByKey(countsA)

【讨论】:

  • 这是一个非常好的答案。谢谢。
  • “你通常想使用某种类型的连接。”。一般情况下是这样,但肯定在这里你想要subtractByKey
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-21
  • 2017-10-19
  • 2018-06-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多