【问题标题】:Replace Null Values of a Column with mean of another Categorcial Column in Spark Dataframe用 Spark Dataframe 中另一个分类列的平均值替换列的空值
【发布时间】:2017-07-10 22:10:33
【问题描述】:

我有一个这样的数据集

id    category     value
1     A            NaN
2     B            NaN
3     A            10.5
5     A            2.0
6     B            1.0

我想用它们各自类别的平均值填充 NAN 值。如下图

id    category     value
1     A            4.16
2     B            0.5
3     A            10.5
5     A            2.0
6     B            1.0

我尝试使用 group by 计算每个类别的第一个平均值

val df2 = dataFrame.groupBy(category).agg(mean(value)).rdd.map{
      case r:Row => (r.getAs[String](category),r.get(1))
    }.collect().toMap
    println(df2)

我得到了每个类别的地图及其各自的平均值。output: Map(A ->4.16,B->0.5) 现在我尝试在 Sparksql 中更新查询来填充列,但似乎 spqrkSql dosnt 支持更新查询。我试图在数据框中填充空值,但没有这样做。 我能做些什么?我们可以在 pandas 中做同样的事情,如Pandas: How to fill null values with mean of a groupby? 所示 但是我该如何使用 spark 数据框

【问题讨论】:

    标签: database scala apache-spark apache-spark-sql


    【解决方案1】:

    我偶然发现了同样的问题并看到了这篇文章。但尝试了不同的解决方案,即使用窗口函数。下面的代码在 pyspark 2.4.3 上进行了测试(Spark 1.4 提供了窗口函数)。我相信这是更清洁的解决方案。 这篇文章很老了,但希望这个答案对其他人有帮助。

    from pyspark.sql import Window
    from pyspark.sql.functions import *
    
    df = spark.createDataFrame([(1,"A", None), (2,"B", None), (3,"A",10.5), (5,"A",2.0), (6,"B",1.0)], ['id', 'category', 'value'])
    
    category_window = Window.partitionBy("category")
    value_mean = mean("value0").over(category_window)
    
    result = df\
      .withColumn("value0", coalesce("value", lit(0)))\
      .withColumn("value_mean", value_mean)\
      .withColumn("new_value", coalesce("value", "value_mean"))\
      .select("id", "category", "new_value")
    
    result.show()
    

    输出将如预期(有问题):

    id  category    new_value       
    1   A   4.166666666666667
    2   B   0.5
    3   A   10.5
    5   A   2
    6   B   1
    

    【讨论】:

      【解决方案2】:

      最简单的解决方案是使用 groupby 并加入:

       val df2 = df.filter(!(isnan($"value"))).groupBy("category").agg(avg($"value").as("avg"))
       df.join(df2, "category").withColumn("value", when(col("value").isNaN, $"avg").otherwise($"value")).drop("avg")
      

      请注意,如果有一个全为 NaN 的类别,它将从结果中删除

      【讨论】:

      • 具体在哪里?也做 df.printSchema()。我的假设是价值是 Double...
      【解决方案3】:

      确实,您无法更新 DataFrame,但您可以使用selectjoin 等函数转换它们。在这种情况下,您可以将分组结果保留为 DataFrame 并将其(在 category 列上)加入原始结果,然后执行将 NaNs 替换为平均值的映射:

      import org.apache.spark.sql.functions._
      import spark.implicits._
      
      // calculate mean per category:
      val meanPerCategory = dataFrame.groupBy("category").agg(mean("value") as "mean")
      
      // use join, select and "nanvl" function to replace NaNs with the mean values:
      val result = dataFrame
        .join(meanPerCategory, "category")
        .select($"category", $"id", nanvl($"value", $"mean")).show()
      

      【讨论】:

      • 要替换空值,您必须将 nanvl 函数替换为 coalesce。或者同时处理:coalesce($"value", nanvl($"value", $"mean"))
      • 抱歉应该是coalesce(nanvl($"value", $"mean"), $"mean")
      • Y 确实导入 spark.implicits._ 无法导入。
      • sparkSparkSession - 如果名称不同,请替换名称;如果您没有 SparkSession,您应该有一个 SQLContext - 导入该上下文的隐式(例如 import sqlContext.implicits._,如果它被命名为 sqlContext
      • 使用 coalesce($"value", $"mean"), $"mean")) 对空值起作用 gr8。但是当我尝试 coalesce(nanvl($"value", $"mean"), $"mean") 时,它也不会填充空值
      猜你喜欢
      • 1970-01-01
      • 2016-11-16
      • 2013-09-12
      • 2017-02-24
      • 2023-02-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多