【问题标题】:How to zip after distnct in pySpark如何在pySpark中区分后压缩
【发布时间】:2015-03-30 21:36:14
【问题描述】:

以下程序在 zip 步骤中失败。

x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
z = x.distinct()
print x.zip(y).collect()

产生的错误取决于是否指定了多个分区。

我明白了

两个 RDD [必须] 具有相同数量的分区和每个分区中相同数量的元素。

解决此限制的最佳方法是什么?

我一直在用下面的代码执行操作,但我希望能找到更高效的东西。

def safe_zip(left, right):
    ix_left = left.zipWithIndex().map(lambda row: (row[1], row[0]))
    ix_right = right.zipWithIndex().map(lambda row: (row[1], row[0]))
    return ix_left.join(ix_right).sortByKey().values()

【问题讨论】:

  • 我认为这是XY problemzip 从其 Python 上下文中假设迭代之间存在顺序。对于 RDD,没有固有的顺序。但是,创建密钥并通过这些密钥加入可能是您正在寻找的,就像您在 safe_zip 中所做的那样。请注意,那里的sortByKey 操作似乎是多余的。通过解释您正在尝试做什么而不是提及如何您正在这样做,您可能会得到更好的答案。

标签: pyspark


【解决方案1】:

我认为这可以通过在你的 RDD 上使用 cartesian() 来完成

import pyspark
x = sc.parallelize([1, 2, 3, 1, 2, 3])
y = sc.parallelize([1, 2, 3])
x.distinct().cartesian(y.distinct()).collect()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-02
    相关资源
    最近更新 更多