【问题标题】:Which function in spark is used to combine two RDDs by keysspark中哪个函数用于通过key组合两个RDD
【发布时间】:2015-01-10 13:09:26
【问题描述】:

假设我有以下两个 RDD,具有以下密钥对值。

rdd1 = [ (key1, [value1, value2]), (key2, [value3, value4]) ]

rdd2 = [ (key1, [value5, value6]), (key2, [value7]) ]

现在,我想通过键值加入它们,所以例如我想返回以下内容

ret = [ (key1, [value1, value2, value5, value6]), (key2, [value3, value4, value7]) ] 

我该如何使用 Python 或 Scala 在 spark 中做到这一点?一种方法是使用 join,但 join 会在元组内创建一个元组。但我希望每个键值对只有一个元组。

【问题讨论】:

    标签: python scala apache-spark rdd


    【解决方案1】:

    只需使用join 然后map 生成的rdd。

    rdd1.join(rdd2).map(case (k, (ls, rs)) => (k, ls ++ rs))
    

    【讨论】:

    • 我有一个总数的 rdd 和计数的 rdd。我将如何通过相同的键加入它们以创建平均值。接受我做错的可能性。
    • 这应该是一个单独的问题,但是:如果你有 values: RDD[(K, Float)]counts: RDD[(K, Int)] (如果没有,则将它们映射成这个形状)然后你可以用 values.join(counts) 得到一个 @ 987654327@, map 离开 K,然后你可以做平均 - 可能已经有一个函数了,但是假设我的数学是正确的,困难的方法是 reduce {case ((v1, count1), (v2, count2)) => ((v1 * count1 + v2 * count2) / (count1 + count2), (count1 + count2))}
    • 是的,这就是最终的解决方案。谢谢!
    【解决方案2】:

    我会将两个 RDD 合并到一个 reduceByKey 来合并这些值。

    (rdd1 union rdd2).reduceByKey(_ ++ _)
    

    【讨论】:

    • @Sai 同样的事情,对吧?不确定是指“未找到”
    • Sai 的语法适用于我的架构。可能是特定于版本的。
    • 只是为了确定,这是一个 Scala 语法,而不是 Python,所以如果你在 Python 中尝试这个,它会抛出一个错误
    猜你喜欢
    • 1970-01-01
    • 2017-02-16
    • 2015-06-15
    • 1970-01-01
    • 1970-01-01
    • 2015-10-18
    • 1970-01-01
    • 2014-11-20
    • 2017-06-22
    相关资源
    最近更新 更多