【问题标题】:Flattening the key of a RDD展平 RDD 的键
【发布时间】:2016-08-18 15:48:23
【问题描述】:

我有一个 (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double]) 类型的 Spark RDD。我希望将它的键变平以将其转换为breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double]) 类型的 RDD。我目前正在做:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))

anonymousOrdering() 的签名是String => (Array[DenseVector[Double]], DenseVector[Double])

它返回type mismatch: required: TraversableOnce[?]。做同样事情的 Python 代码是:

newRDD = oldRDD.flatMap(lambda point: [(tile, point) for tile in anonymousOrdering(point)])

如何在 Scala 中做同样的事情?我一般用flatMapValues但是这里我需要把key压扁。

【问题讨论】:

  • 你能指定anonymousOrdering的签名吗?在展平 RDD 的类型之后,您的问题也是相同的。这是故意的吗?
  • 签名添加(第一个sn-p中的注释),我的意图是将包含(Array(1, 2), 3)的RDD转换为包含(1, 3)的RDD | (2, 3)。在此示例中,我已将类型 DenseVector 替换为整数。

标签: scala apache-spark flatmap


【解决方案1】:

如果我正确理解你的问题,你可以这样做:

val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob))
// newRDD is RDD[(Array[DenseVector], DenseVector)]

在这种情况下,您可以使用模式匹配和 for/yield 语句“展平”元组的 Array 部分:

newRDD = newRDD.flatMap{case (a: Array[DenseVector[Double]], b: DenseVector[Double]) => for (v <- a) yield (v, b)}
// newRDD is RDD[(DenseVector, DenseVector)]

虽然我仍然不清楚你想在哪里/如何使用groupByKey

【讨论】:

  • 我正在删除地图末尾的 groupByKey(),因为它与问题无关。感谢您的回答。
  • val newRDD = oldRDD.flatMap(ob => anonymousOrdering(ob)) 返回found : (Array[breeze.linalg.DenseVector[Double]], breeze.linalg.DenseVector[Double]), required: TraversableOnce[?]
  • 看起来问题出在anonymousOrdering 内部,然后...请参阅此处:stackoverflow.com/questions/30833618/…
  • 链接中使用的方法不适用于RDD,只有DataFrame。
  • 这是一个参考链接,而不是解决方案——我无法仅通过查看类型签名和错误来调试您的 anonymousOrdering 函数。您最初的问题是关于展平键,也许重新接受这个并提出一个新问题?
【解决方案2】:

更改代码以使用 Map 而不是 FlatMap:

val newRDD = oldRDD.map(ob => anonymousOrdering(ob)).groupByKey()

如果anonymousOrdering 返回一个元组列表并且您希望它扁平化,那么您只想在此处使用flatmap。

【讨论】:

  • Mapping 会返回一个 RDD 类型(Array[breeze.linalg.DenseVector[Double]],bre​​eze.linalg.DenseVector[Double]),我想展平元组的第一部分。
  • 能把anonymousOrdering的函数签名贴出来吗?
  • 问题更新,现在在第一个sn-p下面。
【解决方案3】:

由于anonymousOrdering() 是您的代码中的一个函数,请更新它以返回Seq[(breeze.linalg.DenseVector[Double], breeze.linalg.DenseVector[Double])]。这就像(tile, point) for tile in anonymousOrdering(point)] 一样,但直接在匿名函数的末尾。然后flatMap 将为序列的每个元素创建一个分区。

作为一般规则,避免将集合作为 RDD 中的键。

【讨论】:

  • 您询问如何展平密钥,我回答并且您接受了我的回答,但随后您为您的代码做了一个变通方法,这样您就不必再展平密钥了,并接受了您的变通方法作为“解决方案”...您也从未发布过有实际问题的anonymousOrdering 的内容。糟糕的形式!
  • 我再次接受了您的回答,但没有奏效,真正的答案是没有以数组为键的 RDD。
猜你喜欢
  • 2016-06-24
  • 2014-12-17
  • 2017-08-25
  • 2019-11-28
  • 2018-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多