【问题标题】:Comparing two RDDs比较两个 RDD
【发布时间】:2016-10-24 11:16:06
【问题描述】:

我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2。 我将创建一个新的 RDD,其中仅包含 rdd2 的条目,而不是 rdd1 (基于键)。 我通过 Intellij 在 Scala 上使用 Spark。

我将 rdd1 和 rdd2 按一个键分组(我将只比较两个 rdds 的键):

val rdd1Grouped = rdd1.groupBy(line => line(0))
val rdd2Grouped = rdd2.groupBy(line => line(0))

然后,我用了leftOuterJoin

val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect {
  case (k, (v, None)) => (k, v)
}

但这似乎没有给出正确的结果。

它有什么问题?有什么建议吗?

RDDS 示例(每一行都是一个 Array[String],ofc):

rdd1                        rdd2                  output (in some form)

1,18/6/2016               2,9/6/2016                  2,9/6/2016
1,18/6/2016               2,9/6/2016 
1,18/6/2016               2,9/6/2016
1,18/6/2016               2,9/6/2016
1,18/6/2016               1,20/6/2016
3,18/6/2016               1,20/6/2016 
3,18/6/2016               1,20/6/2016
3,18/6/2016
3,18/6/2016
3,18/6/2016

在这种情况下,我只想添加条目“2,9/6/2016”,因为键“2”不在 rdd1 中。

【问题讨论】:

    标签: scala apache-spark compare rdd


    【解决方案1】:

    新的 RDD 仅包含 rdd2 的条目,不在 rdd1 中

    left join 将保留 rdd1 中的所有键并附加 RDD2 匹配键值的列。所以显然左连接/外连接不是解决方案。

    rdd1Grouped.subtractByKey(rdd2Grouped) 适合您的情况。

    附: : 另请注意,如果 rdd1 较小,则更好地广播它。这样,在减法时只会流式传输第二个 rdd。

    【讨论】:

    • 由于rdd1rdd2 不是元组的RDD,因此不能按原样调用subtractByKey。您可能错过了对keyBy 的一些呼叫,例如rdd1.keyBy(_(0)).subtractByKey(rdd2.keyBy(_(0))).values
    • 正确,我应该使用 rdd1Grouped 和 rdd2Grouped,而不仅仅是 rdd1 和 rdd2。现在在我的回答中更正。
    • 实际上分组是多余的(而且通常很昂贵),它改变了结果类型——我认为keyBy在这里更有意义。
    • 你是对的,但是,如果你想要相同键的值列表分组是最好的选择。
    • 它不执行任何缩减,也不过滤重复项 - rdd.keyBy(f) 等效于 rdd.map(v => (f(v), v)) - 它使用给定函数将每个值转换为 (key, value) 的元组创建密钥
    【解决方案2】:

    切换rdd1Groupedrdd2Grouped,然后使用filter

    val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter( line => {
      line._2._2.isEmpty
    }).collect
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-08-17
      • 1970-01-01
      • 1970-01-01
      • 2016-04-08
      • 1970-01-01
      • 1970-01-01
      • 2017-07-15
      相关资源
      最近更新 更多