【发布时间】:2017-12-27 16:14:03
【问题描述】:
我正在寻找一种方法来对从数据帧上的 groupBy() 调用返回的分组数据运行 spark.ml.feature.PCA 函数。但我不确定这是否可能,或者如何实现。这是一个基本示例,希望能说明我想要做什么:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2", "ID"])
df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
| 3| 1| 1|
| 4| 2| 1|
| 5| 2| 1|
| 3| 3| 2|
| 6| 2| 2|
| 4| 4| 2|
+------+------+---+
assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")
df2 = assembler.transform(df)
df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
| 3| 1| 1|[3.0,1.0]|
| 4| 2| 1|[4.0,2.0]|
| 5| 2| 1|[5.0,2.0]|
| 3| 3| 2|[3.0,3.0]|
| 6| 2| 2|[6.0,2.0]|
| 4| 4| 2|[4.0,4.0]|
+------+------+---+---------+
pca = PCA(k=1, inputCol="features", outputCol="component")
此时我有了想要使用的数据框和 pca 对象。我现在想在数据帧上执行 PCA,但按“ID”分组,因此我将获得 ID 为 1 的所有功能的 PCA,以及 ID 为 2 的所有功能的 PCA,只返回组件。我可以通过以下方式手动获取这些:
>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)
但我想在数据帧中的所有不同 ID 上并行运行它,例如:
df2.groupBy("ID").map(lambda group: pca.fit(group).pc)
但是你不能在这样的分组数据上使用map()。有没有办法做到这一点?
【问题讨论】:
-
我需要做类似的事情。我正在考虑编写一个 UDAF,它使用 Spark 的 RowMatrix 类中的代码作为指导,从分组向量中计算协方差矩阵,然后在协方差矩阵上调用 Breeze 的 SVD 以生成 PCA 矩阵。
-
最后成功了吗?
-
为什么不将您手动执行的操作包裹在
for loop中,覆盖每个唯一 ID ?然后,您可以将union生成的数据框重新合并为一个。据我所知,这也将并行运行。 -
而且您可以控制聚合过程,因为 PCA 不聚合。
标签: python machine-learning pyspark pca apache-spark-mllib