【问题标题】:Replace missing values with mean - Spark Dataframe用平均值替换缺失值 - Spark Dataframe
【发布时间】:2017-02-24 17:39:39
【问题描述】:

我有一个包含一些缺失值的 Spark Dataframe。我想通过用该列的平均值替换缺失值来执行简单的插补。我对 Spark 很陌生,所以我一直在努力实现这个逻辑。到目前为止,这是我设法做到的:

a) 要为单个列(假设 Col A)执行此操作,这行代码似乎可以工作:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

b) 但是,我无法弄清楚如何对数据框中的所有列执行此操作。我正在尝试 Map 功能,但我相信它会循环遍历数据框的每一行

c) 在 SO - here 上有一个类似的问题。虽然我喜欢这个解决方案(使用聚合表和合并),但我很想知道是否有办法通过循环遍历每一列来做到这一点(我来自 R,所以使用更高阶的函数循环遍历每一列,比如lapply 对我来说似乎更自然)。

谢谢!

【问题讨论】:

  • 顺便说一句,在scala 中使用asInstanceOf[T] 被认为是一种不好的做法。

标签: scala apache-spark dataframe apache-spark-sql imputation


【解决方案1】:

火花 >= 2.2

您可以使用org.apache.spark.ml.feature.Imputer(支持均值和中值策略)。

斯卡拉

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

Python

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

火花

你在这里:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

在哪里

df.columns.map(mean(_)): Array[Column] 

计算每列的平均值,

df.select(_: *).first.toSeq: Seq[Any]

收集聚合值并将行转换为Seq[Any](我知道它不是最理想的,但这是我们必须使用的 API),

df.columns.zip(_).toMap: Map[String,Any] 

创建aMap: Map[String, Any],它从列名映射到它的平均值,最后:

df.na.fill(_): DataFrame

使用以下方法填充缺失值:

fill: Map[String, Any] => DataFrame 

来自DataFrameNaFunctions

要引入 NaN 条目,您可以替换:

df.select(df.columns.map(mean(_)): _*).first.toSeq

与:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq

【讨论】:

  • 关于为什么要使用 pyspark 的 python 代码的任何想法,我都可以让它为均值工作,但是当我尝试执行 .setStrategy("median") 时,它会输出不正确的插补值?
  • This question 讨论了为什么并提供了潜在的新方法来解决问题。总结:当工作分布在多个工作节点而不是单个驱动节点(如 pandas)时,很难估计中间值。
【解决方案2】:

用于在 PySpark

## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
   median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]

然后,申请na.fill

df_imputed = df.na.fill(median_dict)

【讨论】:

    【解决方案3】:

    对于 PySpark,这是我使用的代码:

    mean_dict = { col: 'mean' for col in df.columns }
    col_avgs = df.agg( mean_dict ).collect()[0].asDict()
    col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
    df.fillna( col_avgs ).show()
    

    四个步骤是:

    1. 创建字典mean_dict 将列名映射到聚合操作(平均值)
    2. 计算每一列的平均值,并将其保存为字典col_avgs
    3. col_avgs 中的列名以avg( 开头,以) 结尾,例如avg(col1)。去掉括号。
    4. 使用col_avgs 用平均值填充数据框的列

    【讨论】:

      猜你喜欢
      • 2018-02-05
      • 2012-05-03
      • 2018-12-27
      • 1970-01-01
      • 1970-01-01
      • 2013-09-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多