【问题标题】:Pyspark StandardScaler over a Window窗口上的 Pyspark StandardScaler
【发布时间】:2020-09-21 16:51:04
【问题描述】:

我想在我的数据窗口上使用标准缩放器pyspark.ml.feature.StandardScaler

df4=spark.createDataFrame(
    [
        (1,1, 'X', 'a'),
        (2,1, 'X', 'a'),
        (3,9, 'X', 'b'),
        (5,1, 'X', 'b'),
        (6,2, 'X', 'c'),
        (7,2, 'X', 'c'),
        (8,10, 'Y', 'a'),
        (9,45, 'Y', 'a'),
        (10,3, 'Y', 'a'),
        (11,3, 'Y', 'b'),
        (12,6, 'Y', 'b'),
        (13,19,'Y', 'b')
    ],
    ['id','feature', 'txt', 'cat'] 
)

w = Window().partitionBy(..)

我可以通过调用 .fit& .transform 方法对整个数据框执行此操作。但不是在我们通常使用的w 变量上,例如F.col('feature') - F.mean('feature').over(w)

我可以将所有窗口化/分组数据转换为单独的列,将其放入数据框中,然后对其应用 StandardScaler 并转换回1D。还有其他方法吗?最终目标是尝试不同的缩放器,包括pyspark.ml.feature.RobustScaler

【问题讨论】:

    标签: python-3.x apache-spark pyspark window-functions


    【解决方案1】:

    我最终不得不编写自己的缩放器类。在上述问题中使用pyspark StandardScaler 是不合适的,因为我们都知道端到端系列转换更有效。尽管如此,我还是想出了自己的缩放器。它并没有真正使用来自 pyspark 的 Window,但我使用 groupby 实现了该功能。

    class StandardScaler:
        
        tol = 0.000001
        
        def __init__(self, colsTotransform, groupbyCol='txt', orderBycol='id'):
            self.colsTotransform = colsTotransform
            self.groupbyCol=groupbyCol
            self.orderBycol=orderBycol
        
        def __tempNames__(self):
            return [(f"{colname}_transformed",colname) for colname in self.colsTotransform]
        
        def fit(self, df):
            funcs = [(F.mean(name), F.stddev(name)) for name in self.colsTotransform]
            exprs = [ff for tup in funcs for ff in tup]
            self.stats = df.groupBy([self.groupbyCol]).agg(*exprs)
        
        def __transformOne__(self, df_with_stats, newName, colName):
            return df_with_stats\
                    .withColumn(newName, 
                                (F.col(colName)-F.col(f'avg({colName})'))/(F.col(f'stddev_samp({colName})')+self.tol))\
                    .drop(colName)\
                    .withColumnRenamed(newName, colName)
    
        def transform(self, df):
            df_with_stats = df.join(self.stats, on=self.groupbyCol, how='inner').orderBy(self.orderBycol)
            return reduce(lambda df_with_stats, kv: self.__transformOne__(df_with_stats, *kv), 
                           self.__tempNames__(), df_with_stats)[df.columns]
        
       
    

    用法:

    ss = StandardScaler(colsTotransform=['feature'],groupbyCol='txt',orderbyCol='id')
    ss.fit(df4)
    ss.stats.show()
    
    +---+------------------+--------------------+
    |txt|      avg(feature)|stddev_samp(feature)|
    +---+------------------+--------------------+
    |  Y|14.333333333333334|  16.169930941926335|
    |  X|2.6666666666666665|  3.1411250638372654|
    +---+------------------+--------------------+
    
    df4.show()
    
    +---+-------+---+---+
    | id|feature|txt|cat|
    +---+-------+---+---+
    |  1|      1|  X|  a|
    |  2|      1|  X|  a|
    |  3|      9|  X|  b|
    |  5|      1|  X|  b|
    |  6|      2|  X|  c|
    |  7|      2|  X|  c|
    |  8|     10|  Y|  a|
    |  9|     45|  Y|  a|
    | 10|      3|  Y|  a|
    | 11|      3|  Y|  b|
    | 12|      6|  Y|  b|
    | 13|     19|  Y|  b|
    +---+-------+---+---+
    
    ss.transform(df4).show()
    +---+--------------------+---+---+
    | id|             feature|txt|cat|
    +---+--------------------+---+---+
    |  1|  -0.530595281053646|  X|  a|
    |  2|  -0.530595281053646|  X|  a|
    |  3|  2.0162620680038548|  X|  b|
    |  5|  -0.530595281053646|  X|  b|
    |  6|-0.21223811242145835|  X|  c|
    |  7|-0.21223811242145835|  X|  c|
    |  8| -0.2679871102053074|  Y|  a|
    |  9|  1.8965241645298676|  Y|  a|
    | 10| -0.7008893651523425|  Y|  a|
    | 11| -0.7008893651523425|  Y|  b|
    | 12| -0.5153598273178989|  Y|  b|
    | 13|  0.2886015032980233|  Y|  b|
    +---+--------------------+---+---+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-01-20
      • 2021-07-02
      • 2018-02-02
      • 1970-01-01
      • 2016-05-10
      • 2021-05-06
      • 2019-08-16
      相关资源
      最近更新 更多