下面是我尝试展平嵌套的 DF,也许这有帮助 -
用 scala 编写,但可以在 pyspark 中实现,改动很小
加载提供的测试数据
val data =
"""
|{
| "template": {
| "id": "9",
| "header": [{
| "id": "header",
| "value": "Find the Right Marker for the Job"
| },
| {
| "id": "section1-header",
| "value": "Desk-Style Dry Erase Markers"
| },
| {
| "id": "section2-header",
| "value": "Pen-Style Dry Erase Markers"
| },
| {
| "id": "section3-header",
| "value": "Jumbo Washable Markers"
| }
| ],
| "paragraph": [{
| "id": "description1",
| "value": ["Desk-style wipe off easily "]
| },
| {
| "id": "description2",
| "value": ["Pen-style "]
| },
| {
| "id": "description3",
| "value": ["banners."]
| },
| {
| "id": "description4",
| "value": ["posters"]
| }
| ],
| "image": [{
| "id": "section1-image",
| "assetId": "S"
| },
| {
| "id": "section2-image",
| "assetId": "A"
| },
| {
| "id": "section3-image",
| "assetId": "34"
| },
| {
| "id": "section4-image",
| "assetId": "36"
| }
| ]
| }
|}
""".stripMargin
val df = spark.read.option("multiLine", true)
.json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |template |
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |[[[header, Find the Right Marker for the Job], [section1-header, Desk-Style Dry Erase Markers], [section2-header, Pen-Style Dry Erase Markers], [section3-header, Jumbo Washable Markers]], 9, [[S, section1-image], [A, section2-image], [34, section3-image], [36, section4-image]], [[description1, [Desk-style wipe off easily ]], [description2, [Pen-style ]], [description3, [banners.]], [description4, [posters]]]]|
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- template: struct (nullable = true)
* | |-- header: array (nullable = true)
* | | |-- element: struct (containsNull = true)
* | | | |-- id: string (nullable = true)
* | | | |-- value: string (nullable = true)
* | |-- id: string (nullable = true)
* | |-- image: array (nullable = true)
* | | |-- element: struct (containsNull = true)
* | | | |-- assetId: string (nullable = true)
* | | | |-- id: string (nullable = true)
* | |-- paragraph: array (nullable = true)
* | | |-- element: struct (containsNull = true)
* | | | |-- id: string (nullable = true)
* | | | |-- value: array (nullable = true)
* | | | | |-- element: string (containsNull = true)
*/
重命名结构 col 并展平架构
val p = df.withColumn("template", col("template")
.cast(
"""
|struct<
|header:array<struct<header_id:string, header_value:string>>,
|id:string,
|image:array<struct<image_value:string, image_id:string>>,
|paragraph:array<struct<paragraph_id:string, paragraph_value:array<string>>>
|>
""".stripMargin.replaceAll("\n", "")))
.selectExpr("template.*")
.withColumn("paragraph", expr("TRANSFORM(paragraph, x -> named_struct('paragraph_id', x.paragraph_id, " +
"'paragraph_value', x.paragraph_value[0]))"))
p.show(false)
p.printSchema()
/**
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
* |header |id |image |paragraph |
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
* |[[header, Find the Right Marker for the Job], [section1-header, Desk-Style Dry Erase Markers], [section2-header, Pen-Style Dry Erase Markers], [section3-header, Jumbo Washable Markers]]|9 |[[S, section1-image], [A, section2-image], [34, section3-image], [36, section4-image]]|[[description1, Desk-style wipe off easily ], [description2, Pen-style ], [description3, banners.], [description4, posters]]|
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- header: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- header_id: string (nullable = true)
* | | |-- header_value: string (nullable = true)
* |-- id: string (nullable = true)
* |-- image: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- image_value: string (nullable = true)
* | | |-- image_id: string (nullable = true)
* |-- paragraph: array (nullable = true)
* | |-- element: struct (containsNull = false)
* | | |-- paragraph_id: string (nullable = true)
* | | |-- paragraph_value: string (nullable = true)
*/
posexplode每个数组作为不同的数据框,然后使用position列加入
val p1 = p.select($"id", posexplode_outer($"header")).selectExpr("col.*", "pos as position", "id")
val p2 = p.select($"id", posexplode_outer($"image")).selectExpr("col.*", "pos as position")
val p3 = p.select($"id", posexplode_outer($"paragraph")).selectExpr("col.*", "pos as position")
p1.join(p2, "position")
.join(p3, "position")
.show(false)
/**
* +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
* |position|header_id |header_value |id |image_value|image_id |paragraph_id|paragraph_value |
* +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
* |2 |section2-header|Pen-Style Dry Erase Markers |9 |34 |section3-image|description3|banners. |
* |0 |header |Find the Right Marker for the Job|9 |S |section1-image|description1|Desk-style wipe off easily |
* |1 |section1-header|Desk-Style Dry Erase Markers |9 |A |section2-image|description2|Pen-style |
* |3 |section3-header|Jumbo Washable Markers |9 |36 |section4-image|description4|posters |
* +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
*/