【问题标题】:Perform PCA on each group of a groupBy in PySpark在 PySpark 中对 groupBy 的每一组执行 PCA
【发布时间】:2017-12-27 16:14:03
【问题描述】:

我正在寻找一种方法来对从数据帧上的 groupBy() 调用返回的分组数据运行 spark.ml.feature.PCA 函数。但我不确定这是否可能,或者如何实现。这是一个基本示例,希望能说明我想要做什么:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA   

df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2",  "ID"])

df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
|     3|     1|  1|
|     4|     2|  1|
|     5|     2|  1|
|     3|     3|  2|
|     6|     2|  2|
|     4|     4|  2|
+------+------+---+

assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")

df2 = assembler.transform(df)

df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
|     3|     1|  1|[3.0,1.0]|
|     4|     2|  1|[4.0,2.0]|
|     5|     2|  1|[5.0,2.0]|
|     3|     3|  2|[3.0,3.0]|
|     6|     2|  2|[6.0,2.0]|
|     4|     4|  2|[4.0,4.0]|
+------+------+---+---------+

pca = PCA(k=1, inputCol="features", outputCol="component")

此时我有了想要使用的数据框和 pca 对象。我现在想在数据帧上执行 PCA,但按“ID”分组,因此我将获得 ID 为 1 的所有功能的 PCA,以及 ID 为 2 的所有功能的 PCA,只返回组件。我可以通过以下方式手动获取这些:

>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)

但我想在数据帧中的所有不同 ID 上并行运行它,例如:

df2.groupBy("ID").map(lambda group: pca.fit(group).pc)

但是你不能在这样的分组数据上使用map()。有没有办法做到这一点?

【问题讨论】:

  • 我需要做类似的事情。我正在考虑编写一个 UDAF,它使用 Spark 的 RowMatrix 类中的代码作为指导,从分组向量中计算协方差矩阵,然后在协方差矩阵上调用 Breeze 的 SVD 以生成 PCA 矩阵。
  • 最后成功了吗?
  • 为什么不将您手动执行的操作包裹在 for loop 中,覆盖每个唯一 ID ?然后,您可以将union 生成的数据框重新合并为一个。据我所知,这也将并行运行。
  • 而且您可以控制聚合过程,因为 PCA 不聚合。

标签: python machine-learning pyspark pca apache-spark-mllib


【解决方案1】:

火花>=3.0.0

Spark 3.0.0 开始,您可以使用applyInPandas 对当前DataFrame 的每一组应用一个简单的Python 函数,并将结果作为另一个DataFrame 返回。您基本上需要定义返回的 DataFrame 的输出模式。

这里我将使用 scikit-learnPCA 函数而不是 Spark 实现,因为它必须应用于单个 pandas 数据帧,而不是 Spark 数据帧。无论如何,要找到的主要成分应该是相同的。

import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType


# define PCA parameters
cols = ['Value1', 'Value2']
pca_components = 1


# define Python function
def pca_udf(pdf):
    X = pdf[cols]
    pca = PCA(n_components=pca_components)
    PC = pca.fit_transform(X)
    PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
    result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
    return result


# define output schema; principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)


df\
  .groupby('ID')\
  .applyInPandas(pca_udf, output_schema)\
  .show()

+------+------+---+-------------------+
|Value1|Value2| ID|               PC_1|
+------+------+---+-------------------+
|     3|     1|  1| 1.1962465491226262|
|     4|     2|  1|-0.1572859751773413|
|     5|     2|  1|-1.0389605739452852|
|     3|     3|  2|-1.1755661316905914|
|     6|     2|  2|  1.941315590145264|
|     4|     4|  2|-0.7657494584546719|
+------+------+---+-------------------+

火花

Spark 3.0.0 之前 - 但仍然使用Spark>=2.3.0 - 解决方案类似,但我们需要实际定义一个pandas_udf,这是一个由 Spark 执行的矢量化用户定义函数,使用 Arrow 传输数据,Pandas 使用数据。无论如何定义它的概念与前面的概念相似。

import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType


# macro-function that includes the pandas_udf and allows to pass it some parameters
def pca_by_group(df, cols, pca_components=1):
    # build output schema for the Pandas UDF
    # principal components are generated dynamically based on `pca_components`
    to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
    output_schema = StructType(df.schema.fields + to_append)

    # Pandas UDF for applying PCA within each group
    @pandas_udf(output_schema, functionType=PandasUDFType.GROUPED_MAP)
    def pca_udf(pdf):
        X = pdf[cols]
        pca = PCA(n_components=pca_components)
        PC = pca.fit_transform(X)
        PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
        result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
        return result
    
    # apply the Pandas UDF
    df = df\
        .groupby('ID')\
        .apply(pca_udf)
    
    return df


new_df = pca_by_group(df, cols=['Value1', 'Value2'], pca_components=1)
new_df.show()

+------+------+---+-------------------+
|Value1|Value2| ID|               PC_1|
+------+------+---+-------------------+
|     3|     1|  1| 1.1962465491226262|
|     4|     2|  1|-0.1572859751773413|
|     5|     2|  1|-1.0389605739452852|
|     3|     3|  2|-1.1755661316905914|
|     6|     2|  2|  1.941315590145264|
|     4|     4|  2|-0.7657494584546719|
+------+------+---+-------------------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-22
    • 2016-10-11
    • 1970-01-01
    • 1970-01-01
    • 2021-12-23
    • 1970-01-01
    • 2018-06-18
    相关资源
    最近更新 更多