【问题标题】:How can I group with multiple RDDs without aggregate the original RDD's partition?如何在不聚合原始 RDD 分区的情况下与多个 RDD 进行分组?
【发布时间】:2019-08-08 14:50:38
【问题描述】:

我有两个 RDD 有公共变量,格式如下:

 x = sc.parallelize([("A", 1), ("B", 4),("A",2)])
 y = sc.parallelize([("A", -1),("B", 5)])

然后我想使用公共变量与他们分组。 "A""B"

我尝试使用以下命令:

 z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
 print(z)

我得到的是

[('A', ([1, 2], [-1])), ('B', ([4], [5]))]

但是,我想要的是

[('A', ([1], [-1])), ('B', ([4], [5])),('A', ([2], [-1]))]

如何更改代码以获得如上所示的输出?谢谢。

【问题讨论】:

    标签: python pyspark rdd


    【解决方案1】:

    您可以通过直接连接来做到这一点:

    print(x.join(y).collect())
    #[('A', (1, -1)), ('A', (2, -1)), ('B', (4, 5))]
    

    如果您希望tuples 的元素为lists,请添加对mapValues 的调用:

    print(x.join(y).mapValues(lambda a: tuple([b] for b in a)).collect())
    #[('A', ([1], [-1])), ('A', ([2], [-1])), ('B', ([4], [5]))]
    

    【讨论】:

      猜你喜欢
      • 2016-01-19
      • 2016-05-07
      • 2019-03-20
      • 2019-04-16
      • 2014-08-31
      • 2015-05-26
      • 2015-06-18
      • 2016-01-27
      • 2019-06-13
      相关资源
      最近更新 更多