【问题标题】:Fill Nan with mean of the row in Scala-Spark在 Scala-Spark 中用行的平均值填充 Nan
【发布时间】:2017-04-06 22:11:09
【问题描述】:

我有一个有 6 列的 RDD,其中最后 5 列可能包含 NaN。我的意图是将 NaN 替换为该行最后 5 个非 Nan 值的平均值。例如,有这个输入:

1, 2, 3, 4, 5, 6
2, 2, 2, NaN, 4, 0
3, NaN, NaN, NaN, 6, 0
4, NaN, NaN, 4, 4, 0 

输出应该是:

1, 2, 3, 4, 5, 6
2, 2, 2, 2, 4, 0
3, 3, 3, 3, 6, 0
4, 3, 3, 4, 4, 0

我知道如何用将RDD 转换为DataFrame 的列的平均值填充这些 NaN:

var aux1 = df.select(df.columns.map(c => mean(col(c))) :_*)
var aux2 = df.na.fill(/*get values of aux1*/)

我的问题是,如何执行此操作,而不是用列平均值填充 NaN,而是用行子组的值的平均值填充它?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您可以通过定义一个函数来获取平均值,并定义另一个函数来填充一行中的空值。

    鉴于您提供的 DF:

    val df = sc.parallelize(List((Some(1),Some(2),Some(3),Some(4),Some(5),Some(6)),(Some(2),Some(2),Some(2),None,Some(4),Some(0)),(Some(3),None,None,None,Some(6),Some(0)),(Some(4),None,None,Some(4),Some(4),Some(0)))).toDF("a","b","c","d","e","f")
    

    我们需要一个函数来获取行的平均值:

    import org.apache.spark.sql.Row
    def rowMean(row: Row): Int = {
       val nonNulls = (0 until row.length).map(i => (!row.isNullAt(i), row.getAs[Int](i))).filter(_._1).map(_._2).toList
       nonNulls.sum / nonNulls.length
    }
    

    还有一个用于在一行中填充空值:

    def rowFillNulls(row: Row, fill: Int): Row = {
       Row((0 until row.length).map(i => if (row.isNullAt(i)) fill else row.getAs[Int](i)) : _*)
    }
    

    现在我们可以先计算每一行的平均值:

    val rowWithMean = df.map(row => (row,rowMean(row)))
    

    然后填写:

    val result = sqlContext.createDataFrame(rowWithMean.map{case (row,mean) => rowFillNulls(row,mean)}, df.schema)
    

    最后查看前后...

    df.show
    +---+----+----+----+---+---+
    |  a|   b|   c|   d|  e|  f|
    +---+----+----+----+---+---+
    |  1|   2|   3|   4|  5|  6|
    |  2|   2|   2|null|  4|  0|
    |  3|null|null|null|  6|  0|
    |  4|null|null|   4|  4|  0|
    +---+----+----+----+---+---+
    
    result.show
    +---+---+---+---+---+---+
    |  a|  b|  c|  d|  e|  f|
    +---+---+---+---+---+---+
    |  1|  2|  3|  4|  5|  6|
    |  2|  2|  2|  2|  4|  0|
    |  3|  3|  3|  3|  6|  0|
    |  4|  3|  3|  4|  4|  0|
    +---+---+---+---+---+---+
    

    这适用于具有 Int 列的任何宽度 DF。您可以轻松地将其更新为其他数据类型,甚至是非数字(提示,检查 df 架构!)

    【讨论】:

      【解决方案2】:

      一堆进口:

      import org.apache.spark.sql.functions.{col, isnan, isnull, round, when}
      import org.apache.spark.sql.Column
      

      一些辅助函数:

      def nullOrNan(c: Column) = isnan(c) || isnull(c)
      
      def rowMean(cols: Column*): Column = {
        val sum = cols
          .map(c => when(nullOrNan(c), lit(0.0)).otherwise(c))
          .fold(lit(0.0))(_ + _)
        val count = cols
          .map(c => when(nullOrNan(c), lit(0.0)).otherwise(lit(1.0)))
          .fold(lit(0.0))(_ + _)
        sum / count
      }
      

      解决方案:

      val mean = round(
        rowMean(df.columns.tail.map(col): _*)
      ).cast("int").alias("mean")
      
      val exprs = df.columns.tail.map(
        c => when(nullOrNan(col(c)), mean).otherwise(col(c)).alias(c)
      )
      
      val filled = df.select(col(df.columns(0)) +: exprs: _*)
      

      【讨论】:

        【解决方案3】:

        嗯,这是一个有趣的小问题 - 我会发布我的解决方案,但我一定会观察,看看是否有人想出了更好的方法:)

        首先我要介绍几个udfs:

        val avg = udf((values: Seq[Integer]) => {
          val notNullValues = values.filter(_ != null).map(_.toInt)
          notNullValues.sum/notNullValues.length
        })
        
        val replaceNullWithAvg = udf((x: Integer, avg: Integer) => if(x == null) avg else x)
        

        然后我会像这样申请DataFrame

        dataframe
          .withColumn("avg", avg(array(df.columns.tail.map(s => df.col(s)):_*)))
          .select('col1, replaceNullWithAvg('col2, 'avg) as "col2", replaceNullWithAvg('col3, 'avg) as "col3", replaceNullWithAvg('col4, 'avg) as "col4", replaceNullWithAvg('col5, 'avg) as "col5", replaceNullWithAvg('col6, 'avg) as "col6")
        

        这将为您提供所需的内容,但可以说不是我所编写的最复杂的代码......

        【讨论】:

        • 所以,我改进了我的答案,使 avg-udf 能够处理任意数量的列。我尊重您已经接受了另一个答案,但我想指出我的解决方案不需要您在rddsdataframes 之间来回切换,而是直接在dataframe 上操作:)跨度>
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-10-01
        • 1970-01-01
        • 2018-08-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多