【问题标题】:Spark: Replace Null value in a Nested columnSpark:替换嵌套列中的空值
【发布时间】:2020-07-18 16:16:45
【问题描述】:

我想将以下数据框中的所有 n/a 值替换为 unknown。 它可以是scalarcomplex nested column。 如果是StructField column,我可以遍历列并使用WithColumn 替换n\a。 但我希望这可以在generic way 中完成,尽管该列的type 因为我不想明确指定列名,因为在我的情况下有 100 个?

case class Bar(x: Int, y: String, z: String)
case class Foo(id: Int, name: String, status: String, bar: Seq[Bar])

val df = spark.sparkContext.parallelize(
Seq(
  Foo(123, "Amy", "Active", Seq(Bar(1, "first", "n/a"))),
  Foo(234, "Rick", "n/a", Seq(Bar(2, "second", "fifth"),Bar(22, "second", "n/a"))),
  Foo(567, "Tom", "null", Seq(Bar(3, "second", "sixth")))
)).toDF

df.printSchema
df.show(20, false)

结果:

+---+----+------+---------------------------------------+
|id |name|status|bar                                    |
+---+----+------+---------------------------------------+
|123|Amy |Active|[[1, first, n/a]]                      |
|234|Rick|n/a   |[[2, second, fifth], [22, second, n/a]]|
|567|Tom |null  |[[3, second, sixth]]                   |
+---+----+------+---------------------------------------+   

预期输出:

+---+----+----------+---------------------------------------------------+
|id |name|status    |bar                                                |
+---+----+----------+---------------------------------------------------+
|123|Amy |Active    |[[1, first, unknown]]                              |
|234|Rick|unknown   |[[2, second, fifth], [22, second, unknown]]        |
|567|Tom |null      |[[3, second, sixth]]                               |
+---+----+----------+---------------------------------------------------+

对此有何建议?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    如果您喜欢使用 RDD,这里有一个简单、通用且不断发展的解决方案:

      val naToUnknown = {r: Row =>
        def rec(r: Any): Any = {
          r match {
            case row: Row => Row.fromSeq(row.toSeq.map(rec))
            case seq: Seq[Any] => seq.map(rec)
            case s: String if s == "n/a" => "unknown"
            case _ => r
          }
        }
        Row.fromSeq(r.toSeq.map(rec))
      }
    
      val newDF = spark.createDataFrame(df.rdd.map{naToUnknown}, df.schema)
      newDF.show(false)
    

    输出:

    +---+----+-------+-------------------------------------------+
    |id |name|status |bar                                        |
    +---+----+-------+-------------------------------------------+
    |123|Amy |Active |[[1, first, unknown]]                      |
    |234|Rick|unknown|[[2, second, fifth], [22, second, unknown]]|
    |567|Tom |null   |[[3, second, sixth]]                       |
    +---+----+-------+-------------------------------------------+
    

    【讨论】:

    • 很好的答案。我只想补充一点,在做df.map时,你需要传递RowEncoder(df.schema)隐式参数:df.map{naToUnknown}(RowEncoder(df.schema))
    【解决方案2】:

    当您只有简单的列和结构时,替换嵌套值很容易。 对于数组字段,您必须在替换或使用 UDF / 高阶函数之前分解结构,请参阅我的其他答案 here

    您可以定义一个循环遍历 DataFrame 架构的通用函数 并应用 lambda 函数 func 来替换你想要的:

    def replaceNestedValues(schema: StructType, func: Column => Column, path: Option[String] = None): Seq[Column] = {
      schema.fields.map(f => {
        val p = path.fold(s"`${f.name}`")(c => s"$c.`${f.name}`")
        f.dataType match {
          case s: StructType => struct(replaceNestedValues(s, func, Some(p)): _*).alias(f.name)
          case _ => func(col(p)).alias(f.name)
        }
      })
    }
    

    在使用这个函数之前,把数组结构bar分解成这样:

    val df2 = df.select($"id", $"name", $"status", explode($"bar").alias("bar"))
    

    然后,使用when/otherwise 函数定义一个接受一列并用unknown 将其替换为n/a 的lambda 函数,并使用上述函数对列应用转换:

    val replaceNaFunc: Column => Column = c => when(c === lit("n/a"), lit("unknown")).otherwise(c)
    val replacedCols = replaceNestedValues(df2.schema, replaceNaFunc)
    

    选择新列和 groupBy 以取回 bar 数组:

    df2.select(replacedCols: _*).groupBy($"id", $"name", $"status").agg(collect_list($"bar").alias("bar")).show(false)
    

    给予:

    +---+----+-------+-------------------------------------------+                  
    |id |name|status |bar                                        |
    +---+----+-------+-------------------------------------------+
    |234|Rick|unknown|[[2, second, fifth], [22, second, unknown]]|
    |123|Amy |Active |[[1, first, unknown]]                      |
    |567|Tom |null   |[[3, second, sixth]]                       |
    +---+----+-------+-------------------------------------------+
    

    【讨论】:

    • 谢谢@@blackbishop,在这里我也需要明确指定列名。由于我有许多标量和嵌套字段,是否可以在不提及列名的情况下对其进行修改?
    • @Leibnitz 不幸的是,使用此解决方案,您必须至少指定数组列,因为我们需要在替换值之前分解它们...对于其他列,可以从 @987654333 获取它们@思想。
    【解决方案3】:

    您可以定义一个 UDF 来处理您的 Array 并替换您想要的项目:

    UDF

     val replaceNA =  udf((x:Row) => {
          val z = x.getString(2)
          if ( z == "n/a")
            Bar(x.getInt(0), x.getString(1), "unknow")
          else
            Bar(x.getInt(0), x.getString(1), x.getString(2))
          })
    

    一旦你有了那个 UDF,你就可以分解你的数据框来将 bar 中的每个项目作为一行:

     val explodedDF = df.withColumn("exploded", explode($"bar"))
    +---+----+------+--------------------+------------------+
    | id|name|status|                 bar|          exploded|
    +---+----+------+--------------------+------------------+
    |123| Amy|Active|   [[1, first, n/a]]|   [1, first, n/a]|
    |234|Rick|   n/a|[[2, second, fift...|[2, second, fifth]|
    |234|Rick|   n/a|[[2, second, fift...| [22, second, n/a]|
    |567| Tom|  null|[[3, second, sixth]]|[3, second, sixth]|
    +---+----+------+--------------------+------------------+ 
    

    然后应用之前定义的UDF来替换项目:

    val replacedDF = explodedDF.withColumn("exploded", replaceNA($"exploded"))
    +---+----+------+--------------------+--------------------+
    | id|name|status|                 bar|            exploded|
    +---+----+------+--------------------+--------------------+
    |123| Amy|Active|   [[1, first, n/a]]|  [1, first, unknow]|
    |234|Rick|   n/a|[[2, second, fift...|  [2, second, fifth]|
    |234|Rick|   n/a|[[2, second, fift...|[22, second, unknow]|
    |567| Tom|  null|[[3, second, sixth]]|  [3, second, sixth]|
    +---+----+------+--------------------+--------------------+
    

    最后分组全部与 collect_list 一起返回到它的原始状态

     val resultDF = replacedDF.groupBy("id", "name", "status")
          .agg(collect_list("exploded").as("bar")).show(false)
    +---+----+------+------------------------------------------+
    |id |name|status|bar                                       |
    +---+----+------+------------------------------------------+
    |234|Rick|n/a   |[[2, second, fifth], [22, second, unknow]]|
    |567|Tom |null  |[[3, second, sixth]]                      |
    |123|Amy |Active|[[1, first, unknow]]                      |
    +---+----+------+------------------------------------------+
    

    一步到位:

    import org.apache.spark.sql._
    
     val replaceNA =  udf((x:Row) => {
              val z = x.getString(2)
              if ( z == "n/a")
                Bar(x.getInt(0), x.getString(1), "unknow")
              else
                Bar(x.getInt(0), x.getString(1), x.getString(2))
              }) 
    
    df.withColumn("exploded", explode($"bar"))
     .withColumn("exploded", replaceNA($"exploded"))
     .groupBy("id", "name", "status")
     .agg(collect_list("exploded").as("bar"))
    

    【讨论】:

    • 谢谢@@Scouto,我还需要明确指定列名。
    • 你可以 groupBy 但除了数组列之外的架构
    猜你喜欢
    • 2020-04-01
    • 2021-10-24
    • 1970-01-01
    • 2019-07-07
    • 2016-01-27
    • 1970-01-01
    • 2017-10-21
    • 2016-04-03
    • 1970-01-01
    相关资源
    最近更新 更多