【问题标题】:Perform Set Difference on RDDs in Spark Python在 Spark Python 中对 RDD 执行集差
【发布时间】:2015-12-27 05:00:31
【问题描述】:

我有两个 spark RDD,A 有 301,500,000 行,B 有 1,500,000 行。 B 中的那 150 万行也都出现在 A 中。我想要这两个 RDD 之间的设置差异,这样我返回 A 的 300,000,000 行,而 B 中的 1,500,000 行不再存在于 A 中。

我不能使用 Spark DataFrames。

这是我现在使用的系统。这些 RDD 有主键。我在下面做的是创建一个(收集的)出现在 B 中的主键列表,然后遍历 A 的主键以找到那些没有出现在 B 主键列表中的主键。

a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']])
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']])
b_primary_keys = b.map(lambda x: x[0]).collect()  # since first col = primary key


def sep_a_and_b(row):
    primary_key = row[0]
    if(primary_key not in b_primary_keys):
        return(row)


a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None)

现在,这适用于这个示例问题,因为 A 和 B 很小。但是,当我使用我的真实数据集 A 和 B 时,这是不成功的。有没有更好(更并行)的方法来实现它?

【问题讨论】:

    标签: python apache-spark rdd set-difference


    【解决方案1】:

    这似乎是您可以通过 subtractByKey 解决的问题

    val filteredA = a.subtractByKey(b)
    

    要更改为键值:

    val keyValRDD = rdd.map(lambda x: (x[:1],x[1:]))
    

    *请注意,我的 python 很弱,可能有更好的方法来拆分值

    【讨论】:

    • 我在任何大型数据集上都经常收到错误ValueError: too many values to unpack(我也在 A = 2,000,000 行和 B = 6,000 行的数据集上尝试过此操作)。我的印象是编译器必须解包 A,它太大而无法保存在主内存中。
    • 正确。看来这个问题与大小无关,而是 RDD 有很多列的问题,而不仅仅是 (key,value) 对。
    • 你能映射成一个键值,其中键是第一项,值是数组吗?
    • 是的。我将更新我的问题以更准确地反映我的数据集。答案是从 [key, val1, val2, val3] 映射数据集 --> (key, [val1, val2, val3]),然后在转换后的数据上使用 subtractByKey
    猜你喜欢
    • 1970-01-01
    • 2017-02-07
    • 2021-06-04
    • 1970-01-01
    • 2017-08-08
    • 2016-09-27
    • 1970-01-01
    • 2020-10-07
    • 1970-01-01
    相关资源
    最近更新 更多