Spark-sql 解决方案:
val df = Seq(("cat", "black", null), ("dog", null, "3")).toDF("animal", "colour", "age")
df.show(false)
+------+------+----+
|animal|colour|age |
+------+------+----+
|cat |black |null|
|dog |null |3 |
+------+------+----+
df.createOrReplaceTempView("a_vw")
val cols_str = df.columns.flatMap( x => Array("\"".concat(x).concat("\""),x)).mkString(",")
spark.sql(s"""
select collect_list(m2) res from (
select id, key, value, map(key,value) m2 from (
select id, explode(m) as (key,value) from
( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
)
where value is not null
) group by id
""")
.show(false)
+------------------------------------+
|res |
+------------------------------------+
|[[animal -> cat], [colour -> black]]|
|[[animal -> dog], [age -> 3]] |
+------------------------------------+
或者更短
spark.sql(s"""
select collect_list(case when value is not null then map(key,value) end ) res from (
select id, explode(m) as (key,value) from
( select monotonically_increasing_id() id, map(${cols_str}) m from a_vw )
) group by id
""")
.show(false)
+------------------------------------+
|res |
+------------------------------------+
|[[animal -> cat], [colour -> black]]|
|[[animal -> dog], [age -> 3]] |
+------------------------------------+