【问题标题】:Spark - Drop null values from map columnSpark - 从地图列中删除空值
【发布时间】:2021-02-25 18:14:06
【问题描述】:

我正在使用 Spark 读取 CSV 文件,然后收集所有字段以创建地图。有些字段是空的,我想从地图中删除它们。

所以对于如下所示的 CSV:

"animal", "colour", "age"
"cat"   , "black" ,
"dog"   ,         , "3"

我想获取包含以下地图的数据集:

Map("animal" -> "cat", "colour" -> "black")
Map("animal" -> "dog", "age" -> "3")

这是我目前所拥有的:

val csv_cols_n_vals: Array[Column] = csv.columns.flatMap { c => Array(lit(c), col(c)) }

sparkSession.read
    .option("header", "true")
    .csv(csvLocation)
    .withColumn("allFieldsMap", map(csv_cols_n_vals: _*))

我尝试了一些变体,但似乎找不到正确的解决方案。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    使用Dataframe API 肯定有更好更高效的方法,但这里有一个map/flatmap 解决方案:

    val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")
    val cols = df.columns
    
    df.map(r => {
       cols.flatMap( c => {
           val v = r.getAs[String](c)
           if (v != null) {
               Some(Map(c -> v))
           } else {
               None
           }
       }).reduce(_ ++ _)
    }).toDF("map").show(false)
    

    产生:

    +--------------------------------+
    |map                             |
    +--------------------------------+
    |[animal -> cat, colour -> black]|
    |[animal -> dog, age -> 3]       |
    +--------------------------------+
    

    【讨论】:

    • 如果你提到的使用Spark SQL的API从优化中受益的解决方案会很高兴
    【解决方案2】:
    scala> df.show(false)
    +------+------+----+
    |animal|colour|age |
    +------+------+----+
    |cat   |black |null|
    |dog   |null  |3   |
    +------+------+----+
    

    构建表达式

    val colExpr = df
    .columns // getting list of columns from dataframe.
    .map{ columnName =>
        when(
            col(columnName).isNotNull, // checking if column is not null
            map(
                lit(columnName),
                col(columnName)
            ) // Adding column name and its value inside map
        )
        .otherwise(map())
    }
    .reduce(map_concat(_,_)) 
    // finally using map_concat function to concat map values.
    
    

    上面的代码将创建下面的表达式。

    map_concat(
        map_concat(
            CASE WHEN (animal IS NOT NULL) THEN map(animal, animal) ELSE map() END, 
            CASE WHEN (colour IS NOT NULL) THEN map(colour, colour) ELSE map() END
        ), 
            CASE WHEN (age IS NOT NULL) THEN map(age, age) ELSE map() END
    )
    

    在 DataFrame 上应用 colExpr

    scala> 
    
    df
    .withColumn("allFieldsMap",colExpr)
    .show(false)
    
    +------+------+----+--------------------------------+
    |animal|colour|age |allFieldsMap                    |
    +------+------+----+--------------------------------+
    |cat   |black |null|[animal -> cat, colour -> black]|
    |dog   |null  |3   |[animal -> dog, age -> 3]       |
    +------+------+----+--------------------------------+
    
    

    【讨论】:

      【解决方案3】:

      Spark-sql 解决方案:

      val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")
      
      df.show(false)
      
      +------+------+----+
      |animal|colour|age |
      +------+------+----+
      |cat   |black |null|
      |dog   |null  |3   |
      +------+------+----+
      
      df.createOrReplaceTempView("a_vw")
      val cols_str = df.columns.flatMap( x => Array("\"".concat(x).concat("\""),x)).mkString(",")
      
      spark.sql(s""" 
      select collect_list(m2) res from (
      select id, key, value, map(key,value) m2 from (
      select id, explode(m) as (key,value) from 
          ( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
          )
       where value is not null
      ) group by id
      """)
      .show(false)
      
      +------------------------------------+
      |res                                 |
      +------------------------------------+
      |[[animal -> cat], [colour -> black]]|
      |[[animal -> dog], [age -> 3]]       |
      +------------------------------------+
      

      或者更短

      spark.sql(s""" 
      select collect_list(case when value is not null then map(key,value) end ) res from (
      select id, explode(m) as (key,value) from 
          ( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
      ) group by id
      """)
      .show(false)
      
      +------------------------------------+
      |res                                 |
      +------------------------------------+
      |[[animal -> cat], [colour -> black]]|
      |[[animal -> dog], [age -> 3]]       |
      +------------------------------------+
      

      【讨论】:

        猜你喜欢
        • 2021-06-07
        • 2020-02-26
        • 1970-01-01
        • 1970-01-01
        • 2021-12-07
        • 2019-09-05
        • 2016-01-21
        • 2021-08-16
        • 1970-01-01
        相关资源
        最近更新 更多