【问题标题】:PySpark fill missing/wrong value with grouped meanPySpark 使用分组平均值填充缺失/错误值
【发布时间】:2019-01-19 21:22:49
【问题描述】:

我有一个 Spark 数据框,其中一个缺失值和一个错误值。

from pyspark.sql import Row
from pyspark.sql.types import StringType, DoubleType, StructType, StructField
# fruit sales data
data = [Row(id='01', fruit='Apple', qty=5.0),
        Row(id='02', fruit='Apple', qty=1.0),
        Row(id='03', fruit='Apple', qty=None),
        Row(id='04', fruit='Pear', qty=6.0),
        Row(id='05', fruit='Pear', qty=2.0),
        Row(id='06', fruit='Mango', qty=6.0),
        Row(id='07', fruit='Mango', qty=-4.0),
        Row(id='08', fruit='Mango', qty=2.0)]
# create dataframe
df = spark.createDataFrame(data)
df.show()
+-----+---+----+
|fruit| id| qty|
+-----+---+----+
|Apple| 01| 5.0|
|Apple| 02| 1.0|
|Apple| 03|null|
| Pear| 04| 6.0|
| Pear| 05| 2.0|
|Mango| 06| 6.0|
|Mango| 07|-4.0|
|Mango| 08| 2.0|
+-----+---+----+

按整列均值进行填充很简单。但是我怎么能做一个分组的意思呢?为了说明,我希望将第 3 行中的 null 替换为 mean(qty) 替换为 Apple - 在本例中为 (5+1)/2=3。同样,-4.0 是第 7 行中的错误值(无负数量),我想用 (6+2)/2=4

替换

在纯 Python 中,我会这样做:

def replace_with_grouped_mean(df, value, column, to_groupby):
    invalid_mask = (df[column] == value)
    # get the mean without the invalid value
    means_by_group = (df[~invalid_mask].groupby(to_groupby)[column].mean())
    # get an array of the means for all of the data
    means_array = means_by_group[df[to_groupby].values].values
    # assign the invalid values to means
    df.loc[invalid_mask, column] = means_array[invalid_mask]
    return df

最终做到:

x = replace_with_grouped_mean(df=df, value=-4, column='qty', to_groupby='fruit')

但是,我不太确定如何在 PySpark 中实现这一点。任何帮助/指针表示赞赏!

【问题讨论】:

    标签: pyspark apache-spark-sql databricks


    【解决方案1】:

    注意点:当我们进行分组时,具有Null 的行将被忽略。如果我们有 3 行,其中之一的值为 Null,则平均除以 2,而不是 3,因为第三个值为 Null。这里的关键是使用Window()函数。

    from pyspark.sql.functions import avg, col, when
    from pyspark.sql.window import Window
    w = Window().partitionBy('fruit')
    
    #Replace negative values of 'qty' with Null, as we don't want to consider them while averaging.
    df = df.withColumn('qty',when(col('qty')<0,None).otherwise(col('qty')))
    df = df.withColumn('qty',when(col('qty').isNull(),avg(col('qty')).over(w)).otherwise(col('qty')))
    df.show()
    +-----+---+---+
    |fruit| id|qty|
    +-----+---+---+
    | Pear| 04|6.0|
    | Pear| 05|2.0|
    |Mango| 06|6.0|
    |Mango| 07|4.0|
    |Mango| 08|2.0|
    |Apple| 01|5.0|
    |Apple| 02|1.0|
    |Apple| 03|3.0|
    +-----+---+---+
    

    【讨论】:

    • 谢谢,在这里我想我需要一些更详细的东西。必须深入挖掘才能了解 Window() 功能。
    • 许多人可能会使用聚合.agg()mean 存储在另一个dataframe 中,然后将join 存储在两个dataframes 中,这隐含地是冗长且效率低下的,因为洗牌。 Windows() 更加高效简洁。
    • 如何在具有模式的分类特征的情况下进行估算?
    猜你喜欢
    • 2021-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-28
    • 1970-01-01
    • 2021-10-25
    相关资源
    最近更新 更多