【问题标题】:Join on two RDDs using Scala in Spark在 Spark 中使用 Scala 加入两个 RDD
【发布时间】:2017-06-27 15:09:54
【问题描述】:

我正在尝试在 Spark 上实现本地异常因子。所以我有一组从文件中读取的点,然后为每个点找到 N 个最近的邻居。每个点都有一个使用 zipWithIndex() 命令赋予它的索引

所以现在我有两个 RDD 首先

RDD[(Index:Long, Array[(NeighborIndex:Long, Distance:Double)])]

其中 Long 表示它的索引,Array 由它的 N 个最近邻居组成,Long 表示这些邻居的索引位置,Double 表示它们与给定点的距离

第二

RDD[(Index:Long,LocalReachabilityDensity:Double)]

这里,Long 再次表示给定点的 Index,Double 表示其 Local Reachability 密度

我想要的是一个 RDD,它包含所有点,以及它们的 N 个最近邻居及其本地可达性密度的数组

RDD[(Index:Long, Array[(NeighborIndex:Long,LocalReachabilityDensity:Double)])]

所以基本上在这里,Long 表示一个点的索引,而数组将是它的 N 个最近邻居,以及它们的索引值和本地可达性密度。

根据我的理解,我需要在第一个 RDD 上运行一个映射,然后将其数组中的值与包含本地可达性密度的第二个 RDD 连接,以获得其所有给定索引的本地可达性密度N 个邻居。但我不确定如何实现这一目标。如果有人可以帮助我,那就太好了

【问题讨论】:

  • 第二个和第三个加入,然后是笛卡尔和第一个,过滤掉远邻

标签: arrays scala join apache-spark


【解决方案1】:

给定:

val rdd1: RDD[(index: Long, Array[(neighborIndex: Long, distance: Double)])] = ...
val rdd2: RDD[(index: Long, localReachabilityDensity: Double)] = ...

我真的不喜欢使用 Scala 的Array。我也不喜欢你的抽象是跨目的的。换句话说,rdd2 中的index 被埋在rdd1 中的各种条目中。这使得事情变得难以推理,并且还引发了 Spark RDD API 的限制,即您在转换第一个 RDD 时无法访问第二个 RDD。我相信你应该重写你当前的工作,以产生更容易使用的抽象。

但如果你必须:

val flipped = rdd1.map { 
  case (index, array) => 
    array.map {
      case (neighborIndex, distance) => (neighborIndex, (index, distance))
    }.elements.toVector
}.flatMap(identity)
 .groupBy(_._1)
val result = flipped.join(rdd2).mapValues {
   case (indexDistances, localReachabilityDensity) => 
      indexDistances.map {
         case (index, _) => (index, localReachabilityDensity)
      }    
}

基本思想是翻转rdd1 以将neighborIndex 值“提取”到顶层作为PairRDD 的键,然后允许我使用rdd2 执行join。并将Array 替换为Vector。一旦你对相同的索引进行了连接,组合起来就容易多了。

请注意,这不是我的想法,可能并不完美。这个想法并不是为您提供复制粘贴的解决方案,而是建议一个不同的方向。

【讨论】:

  • 我在 .head 值上遇到错误 head is not a member of org.apache.spark.rdd.RDD[(Long, Double)] 据我了解,您正在使用 head 来获取第一个元素过滤结果,然后得到它的第二个元素(局部可达性密度)。我尝试使用 first() 代替,但得到一个错误,即不能在转换中调用转换或动作......所以有什么解决方案吗?
  • 所以我决定只移除头部来存储所有值。我遇到了一个错误,你不能在转换中进行转换。该代码在 mapValues 中有一个映射,它给出了错误org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations
  • 即使使用查找我们也会得到相同的错误org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;
  • 您遇到了我不知道的 RDD 限制,因为我从未尝试过您正在尝试做的事情。也许我的最新建议会为您指明正确的方向。最主要的是重新定位你的抽象。
猜你喜欢
  • 2016-09-07
  • 2018-11-24
  • 2016-01-24
  • 2019-03-25
  • 2020-09-06
  • 2015-10-18
  • 2021-09-08
  • 2018-06-01
  • 2017-07-30
相关资源
最近更新 更多