【发布时间】:2016-10-11 14:54:33
【问题描述】:
我想在我的combineByKey/reduceByKey/foldByKey 中包含依赖于当前正在操作的密钥的逻辑。从方法签名可以看出,传递给这些方法的唯一参数是组合/减少/折叠的值。
使用一个简单的示例,我只有一个 RDD,它是 (int, int) 元组,我想要的结果是一个由 tuple[0] 键入的 rdd,其中值是最接近键的 int。
例如:
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
应该减少到:
(1, 3)
(2, 2)
(3, 2)
注意比较(1, 3) 和(1, -1) 我不在乎选择哪一个,因为它们的距离相同。 “3”键也一样。
我想这样做的方式是:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
但是reduce 函数只需要两个参数:两个要组合的值。似乎最简单的方法是引用减速器中的密钥以实现我的目标;这可能吗?
如果我尝试这个,我会得到一个错误:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError: () 只需要 3 个参数(给定 2 个)
我并不是真的在寻找这个示例问题的解决方案。我想知道是否有原因没有将密钥传递给reduceByKey 函数?我认为这是我缺少的 map-reduce 哲学的一些基本原则。
注意我可以通过插入一个映射步骤来解决我的示例,该步骤将每个值映射到一个由值和与键的距离组成的元组:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
【问题讨论】: