【问题标题】:Why can't I reference key in reduce logic?为什么我不能在 reduce 逻辑中引用 key?
【发布时间】:2016-10-11 14:54:33
【问题描述】:

我想在我的combineByKey/reduceByKey/foldByKey 中包含依赖于当前正在操作的密钥的逻辑。从方法签名可以看出,传递给这些方法的唯一参数是组合/减少/折叠的值。

使用一个简单的示例,我只有一个 RDD,它是 (int, int) 元组,我想要的结果是一个由 tuple[0] 键入的 rdd,其中值是最接近键的 int

例如:

(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)

应该减少到:

(1, 3)
(2, 2)
(3, 2)

注意比较(1, 3)(1, -1) 我不在乎选择哪一个,因为它们的距离相同。 “3”键也一样。

我想这样做的方式是:

rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)

但是reduce 函数只需要两个参数:两个要组合的值。似乎最简单的方法是引用减速器中的密钥以实现我的目标;这可能吗?

如果我尝试这个,我会得到一个错误:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()

TypeError: () 只需要 3 个参数(给定 2 个)

我并不是真的在寻找这个示例问题的解决方案。我想知道是否有原因没有将密钥传递给reduceByKey 函数?我认为这是我缺少的 map-reduce 哲学的一些基本原则。


注意我可以通过插入一个映射步骤来解决我的示例,该步骤将每个值映射到一个由值和与键的距离组成的元组:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()

【问题讨论】:

    标签: python mapreduce pyspark


    【解决方案1】:

    我认为没有充分的理由不传递密钥。
    但是,我觉得reduceByKey API 是为常见用例设计的——计算每个键的值的总和。到目前为止,我从不需要值计算中的键。但这只是我的看法。

    您解决的问题似乎也是简单的聚合问题。 min()groupByKey 可以找到答案。我知道您不是在寻找解决方案,但这是我的写作方式。

    from pyspark import SparkContext
    
    sc = SparkContext()
    rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
    reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k))))
    print(reduced.collectAsMap())
    

    结果

    {1: 3, 2: 2, 3: 2}
    

    【讨论】:

    • 不错的答案。我的问题的真正答案很可能只是“因为那不是 API”。但我还是想知道。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-09-30
    • 1970-01-01
    • 1970-01-01
    • 2014-04-10
    • 1970-01-01
    • 1970-01-01
    • 2012-04-27
    相关资源
    最近更新 更多