【问题标题】:Updating an RDD based on value of the other RDD根据另一个 RDD 的值更新一个 RDD
【发布时间】:2016-12-16 21:10:54
【问题描述】:

我想根据另一个 rdd 的值更新一个 rdd。我试过这三种方法: 1.使用左连接 2.先用key减法,再用union 3. 使用地图和if条件

但是上面提到的三种方法都太慢了。

这是一个例子: rdd1 包含一个基于我拥有的不同 userID 和 productID 的 rdd。例如,如果我有从 0 到 100 的用户 ID,我有从 0 到 100 的产品 ID。我最初必须为所有这些用户评分为 0。 rdd1 = [(1,1,0.0),(1,2,0.0),(1,3,0.0),...,(100,100,0.0)]

然后 rdd2 包含特定 userIds 和 productIds 的评级。 rdd2 = [(1,1,3.0),(100,100,4.0)]

我想要的是将所有 userIds 和 productIds 包含在矩阵中以进行协同过滤,即使没有与之对应的评级。我需要这样做才能在 Spark MLLib 中使用显式 ALS。如果我不打算增加 0,我将得到无意义的结果,因为显式代码不包括存在未观察值的场景。因此,它们被认为是缺失而不是零。

简而言之,我想生成这个 rdd: rdd = [(1,1,3.0),(1,1,0.0),(1,2,0.0),...,(100,100,4.0)]

您对在运行时间方面最快的方法有什么想法吗?我有两个 rdd,其中包含数百万个条目用于更新。

【问题讨论】:

  • 请分享您编写的实际代码。你如何选择应该为每个键设置哪个值?
  • 你能提供一个更好的例子吗?你的例子模棱两可。
  • 第一个 rdd 我的所有值都是 0。我正在做的是将 0 的条目附加到第二个 rdd。我必须根据 rdd2 的值更新 rdd1 的值。如果 rdd1 中的键在 rdd2 中找到,我必须使用该值而不是 0。

标签: scala apache-spark functional-programming apache-spark-mllib


【解决方案1】:

你可以这样做:

val res: RDD[(Integer, Integer)] = 
  rdd1.leftOuterJoin(rdd2)
      .mapValues { case (v, wOpt) => wOpt.getOrElse(v) }

【讨论】:

  • leftOuterJoin 对于 spark 1.5.1 来说非常慢,这是在使用 sortMerge 而不是 leftJoin(使用 crossJoin 和过滤)的 spark 1.6 更改之前。如果我不打算使用 leftOuterJoin,是否有另一种更快的方法可以做到这一点。我在我的 rdds 中使用了数百万个条目。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-06-17
  • 2016-08-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多