如果我理解正确,可以使用explode 和pivot 的组合来完成:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = Seq(
(1, Array(("a", 1), ("b", 2), ("c", 3))),
(2, Array(("b", 5), ("c", 6), ("a", 4)))
).toDF("id", "col")
df.show(10, false)
val explodedDF = df.withColumn("col2", explode(df.col("col"))).select("id", "col2")
explodedDF.show(10, false)
val flattenedDF = explodedDF.withColumn("k", $"col2._1").withColumn("v", $"col2._2").select("id", "k", "v")
flattenedDF.show(10, false)
val pivotedDF = flattenedDF.groupBy("id").pivot("k").agg(first(col("v")))
pivotedDF.show(10, false)
import scala.util.parsing.json.JSONObject
pivotedDF.select("a", "b", "c").collect().map{row => JSONObject(row.getValuesMap(row.schema.fieldNames))}.map(println)
// Exiting paste mode, now interpreting.
+---+------------------------+
|id |col |
+---+------------------------+
|1 |[[a, 1], [b, 2], [c, 3]]|
|2 |[[b, 5], [c, 6], [a, 4]]|
+---+------------------------+
+---+------+
|id |col2 |
+---+------+
|1 |[a, 1]|
|1 |[b, 2]|
|1 |[c, 3]|
|2 |[b, 5]|
|2 |[c, 6]|
|2 |[a, 4]|
+---+------+
+---+---+---+
|id |k |v |
+---+---+---+
|1 |a |1 |
|1 |b |2 |
|1 |c |3 |
|2 |b |5 |
|2 |c |6 |
|2 |a |4 |
+---+---+---+
+---+---+---+---+
|id |a |b |c |
+---+---+---+---+
|1 |1 |2 |3 |
|2 |4 |5 |6 |
+---+---+---+---+
{"a" : 1, "b" : 2, "c" : 3}
{"a" : 4, "b" : 5, "c" : 6}
df: org.apache.spark.sql.DataFrame = [id: int, col: array<struct<_1:string,_2:int>>]
explodedDF: org.apache.spark.sql.DataFrame = [id: int, col2: struct<_1: string, _2: int>]
flattenedDF: org.apache.spark.sql.DataFrame = [id: int, k: string ... 1 more field]
pivotedDF: org.apache.spark.sql.DataFrame = [id: int, a: int ... 2 more fields]
import scala.util.parsing.json.JSONObject
res24: Array[Unit] = Array((), ())
scala>