您的聚合代码看起来很好,它应该可以完成工作,除非您的原始数据有一些问题(即数据以字符串而不是 JSON 格式出现)。这是我的测试代码,上面有你的数据,我设置了一个适当的模式:
raw = [
{'year':2007, 'id': '001', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':4}]},
{'year':2009, 'id': '045', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':5}]},
{'year':2009, 'id': '019', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':14}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':55}]},
{'year':2007, 'id': '002', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':15}, {'id':'AA2', 'age':35},{'id':'AA3', 'age':58}]},
{'year':2007, 'id': '003', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':23}]},
{'year':2007, 'id': '006', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':14},{'id':'AA3', 'age':60}]},
{'year':2007, 'id': '008', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':17}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':12}]},
{'year':2008, 'id': '010', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':18}, {'id':'AA2', 'age':16},{'id':'AA3', 'age':23}]},
{'year':2007, 'id': '044', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':17},{'id':'AA3', 'age':52}]},
{'year':2008, 'id': '055', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':43}]},
{'year':2007, 'id': '032', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':22}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':77}]},
{'year':2007, 'id': '034', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':51}]},
{'year':2009, 'id': '077', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':12}]},
{'year':2007, 'id': '025', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':10}]},
]
df = (spark
.createDataFrame(raw)
.select('year', 'id', 'area', 'visitor')
)
df.printSchema()
# root
# |-- year: long (nullable = true)
# |-- id: string (nullable = true)
# |-- area: string (nullable = true)
# |-- visitor: array (nullable = true)
# | |-- element: map (containsNull = true)
# | | |-- key: string
# | | |-- value: string (valueContainsNull = true)
agg = df.groupBy("area").agg(F.collect_list("visitor").alias("visitor_flatten"))
agg.show(10, False)
# +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |area|visitor_flatten |
# +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |GFD |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}], [{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}], [{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]]|
# |TGH |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}], [{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 18, id -> AA1}, {age -> 16, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 24, id -> AA2}, {age -> 43, id -> AA3}], [{age -> 22, id -> AA1}, {age -> 24, id -> AA2}, {age -> 77, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 51, id -> AA3}]]|
# +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
agg.printSchema()
# root
# |-- area: string (nullable = true)
# |-- visitor_flatten: array (nullable = false)
# | |-- element: array (containsNull = false)
# | | |-- element: map (containsNull = true)
# | | | |-- key: string
# | | | |-- value: string (valueContainsNull = true)
agg.withColumn("test", F.explode("visitor_flatten")).select('test').show(10, False)
# +------------------------------------------------------------------------+
# |test |
# +------------------------------------------------------------------------+
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}] |
# |[{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}]|
# |[{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}]|
# |[{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}]|
# |[{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]|
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}] |
# |[{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}]|
# |[{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}]|
# +------------------------------------------------------------------------+
# only showing top 10 rows
agg.withColumn("test", F.explode("visitor_flatten")).select('test').printSchema()
# root
# |-- test: array (nullable = false)
# | |-- element: map (containsNull = true)
# | | |-- key: string
# | | |-- value: string (valueContainsNull = true)