【发布时间】:2016-09-20 19:45:57
【问题描述】:
如何在 pyspark 中将 groupbyKey 转换为 reduceByKey。我附上了一个sn-p。这将为每个区域部门周组合应用一个 corr。我使用了 groupbyKey,但它非常慢并且随机错误(我有 10-20GB 的数据,每个组将有 2-3GB)。请帮助我使用 reduceByKey 重写它
数据集
region dept week val1 valu2
US CS 1 1 2
US CS 2 1.5 2
US CS 3 1 2
US ELE 1 1.1 2
US ELE 2 2.1 2
US ELE 3 1 2
UE CS 1 2 2
输出
region dept corr
US CS 0.5
US ELE 0.6
UE CS .3333
代码
def testFunction (key, value):
for val in value:
keysValue = val.asDict().keys()
inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue])
pdDF = pd.DataFrame(inputpdDF, columns = keysValue)
corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0]
corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}
finalRDD.append(Row(**corrDict))
return finalRDD
resRDD = df.select(["region", "dept", "week", "val1", "val2"])\
.map(lambda r: (Row(region= r.region, dept= r.dept), r))\
.groupByKey()\
.flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1])))
【问题讨论】:
-
reduceByKey 在某些方面与 groupByKey 不同,但主要是聚合之间的区别 - groupby 产生 (key,
) 而 reduce 产生 (key,aggregate 例如 的总和) .因此,从一个重写到另一个意味着了解我们如何对数据使用单通道(聚合器)函数。请注意,我没有费心查看您的“测试功能”。 -
@Chinny84 抱歉,我之前错过了所需的输出格式。是否可以指导我采用替代方法?
标签: apache-spark pyspark