【问题标题】:Spark dataframe replace values of specific columns in a row with NullsSpark数据框用Nulls替换一行中特定列的值
【发布时间】:2019-02-03 19:07:27
【问题描述】:

我在尝试用空值替换 Spark 数据帧的特定列的值时遇到问题。 我有一个超过五十列的数据框,其中两列是关键列。我想创建一个具有相同架构的新数据框,并且新数据框应该具有来自键列的值和非键列中的空值。 我尝试了以下方法但遇到了问题:

//old_df is the existing Dataframe 
val key_cols = List("id", "key_number")
val non_key_cols = old_df.columns.toList.filterNot(key_cols.contains(_))

val key_col_df = old_df.select(key_cols.head, key_cols.tail:_*)
val non_key_cols_df = old_df.select(non_key_cols.head, non_key_cols.tail:_*)
val list_cols = List.fill(non_key_cols_df.columns.size)("NULL")
val rdd_list_cols = spark.sparkContext.parallelize(Seq(list_cols)).map(l => Row(l:_*))
val list_df = spark.createDataFrame(rdd_list_cols, non_key_cols_df.schema)

val new_df = key_col_df.crossJoin(list_df)

当我在old_df 中只有字符串类型的列时,这种方法很好。但是我有一些 double 类型和 int 类型的列会抛出错误,因为 rdd 是一个空字符串列表。

为了避免这种情况,我尝试将list_df 作为一个空数据帧,架构为non_key_cols_df,但crossJoin 的结果是一个空数据帧,我认为这是因为一个数据帧是空的。

我的要求是将 non_key_cols 作为带有 Null 的单行数据框,以便我可以使用 key_col_df 执行 crossJoin 并形成所需的 new_df

还有任何其他更简单的方法将除数据框的关键列之外的所有列更新为空值将解决我的问题。提前致谢

【问题讨论】:

    标签: scala apache-spark dataframe null apache-spark-sql


    【解决方案1】:

    crossJoin 是一项昂贵的操作,因此您希望尽可能避免它。 一个更简单的解决方案是遍历所有非键列并使用lit(null) 插入空值。使用foldLeft 可以按如下方式完成:

    val keyCols = List("id", "key_number")
    val nonKeyCols = df.columns.filterNot(keyCols.contains(_))
    
    val df2 = nonKeyCols.foldLeft(df)((df, c) => df.withColumn(c, lit(null)))
    

    输入示例:

    +---+----------+---+----+
    | id|key_number|  c|   d|
    +---+----------+---+----+
    |  1|         2|  3| 4.0|
    |  5|         6|  7| 8.0|
    |  9|        10| 11|12.0|
    +---+----------+---+----+
    

    将给予:

    +---+----------+----+----+
    | id|key_number|   c|   d|
    +---+----------+----+----+
    |  1|         2|null|null|
    |  5|         6|null|null|
    |  9|        10|null|null|
    +---+----------+----+----+
    

    【讨论】:

    • 感谢您的回复。我会试试的
    【解决方案2】:

    Shaido 答案有一个小缺点 - 列类型会丢失。 可以通过模式使用来修复,如下所示:

    val nonKeyCols = df.schema.fields.filterNot(f => keyCols.contains(f.name))
    val df2 = nonKeyCols.foldLeft(df)((df, c) => df.withColumn(c.name, lit(null).cast(c.dataType)))
    

    【讨论】:

    • 感谢您的指正,尽管我的要求即使使用 NullType 也能​​满足。我正在使用获得的 DF 与另一个 DF 联合,结果 DF 模式没有 NullType
    • 如果其他数据框为空,或者重构期间数据框联合顺序发生变化,可能会出现问题。猜猜,正确的架构是更可靠的解决方案。
    • 是的,当我在 spark shell 中运行代码时,我遇到了第一个 Dataframe 为空时的问题,我尝试使用 c.cast(dataType) 进行投射,但它抛出了一个错误,说不能投射,字符串不能有数据类型。
    猜你喜欢
    • 1970-01-01
    • 2020-04-14
    • 1970-01-01
    • 1970-01-01
    • 2020-10-17
    • 2019-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多