【发布时间】:2018-10-28 13:02:18
【问题描述】:
我是 PySpark 的新手,正在为简单的数据框操作而苦苦挣扎。我有一个类似的数据框:
product period rating product_Desc1 product_Desc2 ..... more columns
a 1 60 foo xx
a 2 70 foo xx
a 3 59 foo xx
b 1 50 bar yy
b 2 55 bar yy
c 1 90 foo bar xy
c 2 100 foo bar xy
我想按产品分组,添加列以计算评分的算术、几何和谐波平均值同时保持数据框中的其余列,这些列在每个产品中都是一致的。
我尝试通过结合内置函数和 UDF 来实现这一点。例如:
a_means = df.groupBy("product").agg(mean("rating").alias("a_mean")
g_means = df.groupBy("product").agg(udf_gmean("rating").alias("g_mean")
地点:
def g_mean(x):
gm = reduce(mul,x)**(1/len(x))
return gm
udf_gmean = udf(g_mean, FloatType())
然后,我会将 a_means 和 g_means 输出与产品上的原始数据框连接起来,并删除重复项。但是,此方法返回错误,对于 g_means,说明 groupBy 中不涉及“评级”,也不是用户定义的聚合函数....
我也尝试过使用 SciPy 的 gmean 模块,但我收到的错误消息指出 ufunc 'log' 不适合输入类型,尽管据我所知,所有评级列都是整数类型。
网站上有类似的问题,但我找不到任何东西似乎可以解决我遇到的这个问题。我真的很感激你的帮助,因为它让我发疯了!
在此先感谢,如果我提供的信息还不够,我今天应该能够迅速提供任何进一步的信息。
值得注意的是,为了提高效率,我无法像使用 Pandas 数据框那样简单地转换为 Pandas 和转换...而且我使用的是 Spark 2.2 并且无法更新!
【问题讨论】:
-
您在函数 g_mean 中所指的“评级”是什么,未定义
-
道歉;现在显示了我的代码中的实际功能。
-
这是一个重复的问题,虽然我被限制使用没有相同功能的 Spark 2.2。
标签: python apache-spark dataframe pyspark apache-spark-sql