【问题标题】:Pyspark to flatten an array and explode a struct to get the desired outputPyspark 展平数组并分解结构以获得所需的输出
【发布时间】:2020-12-19 07:06:01
【问题描述】:

我有一个具有以下架构的数据:索引属性是 Struct --> 带有数组 --> struct 中的每个数组元素

root
 |-- id_num: string (nullable = true)
 |-- indexes: struct (nullable = true)
 |    |-- customer_rating: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- reputation: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- visibility: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

我想将架构转换为以下格式并将数据值放入相应的列中

root
 |-- id_num: string (nullable = true)
 |-- indexes_type: string (nullable = true)    --> this field hold indexes struct elements as a row
 |-- data_sufficiency_indicator: boolean (nullable = true)
 |-- value: double (nullable = true)
 |-- version: string (nullable = true)
 |-- low_value_reason: string (nullable = true)  --> each element in the array becomes a new row

这里是json格式的示例输入数据:

{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}

预期输出

id_num  |   indexes_type    |   version |   data_sufficiency_indicator | value  |   low_value_reason
==============================================================================================================
9999        visibility          2.0             true                    2.16        low scores from reviews_and_visits
9999        visibility          2.0             true                    2.16        low scores from online_presence
9999        customer_rating     2.0             false
9999        reputation          2.0             false
8888        visibility          2.0             true                    2.71        low scores from reviews_and_visits  
8888        visibility          2.0             true                    2.71        low scores from online_presence
8888        customer_rating     2.0             false
7898        visibility          2.0             true                    3.06
7898        customer_rating     2.0             false       
7898        reputation          2.0             false

非常感谢对此用例的任何帮助。也有可能在不硬编码代码中的结构值的情况下获得输出,因为它们可以超出示例中的内容。

【问题讨论】:

  • 您是否能够控制数据加载,即在使用 spark.read.json(..) 时指定架构?
  • @jxc 不确定我是否完全理解您的问题。我想我能做到。现在我正在加载完整的 json 文件并转换为 parquet 格式,然后给我上面的 Schema。你能帮我解决这个问题吗

标签: arrays struct pyspark apache-spark-sql


【解决方案1】:

您可以通过在使用spark.read.json() 加载数据框时显式指定架构,将列indexes 设置为MapType 而不是StructType,见下文:

schema = "id_num string,indexes map<string,array<struct<data_sufficiency_indicator:boolean,low_value_reason:array<string>,value:double,version:string>>>"

df = spark.read.json("/path/to/jsons", schema=schema)

df.printSchema()
root
 |-- id_num: string (nullable = true)
 |-- indexes: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

然后多次执行 selectexplode_outer/inline_outer 以获得所需的结果:

df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \
    .selectExpr("*", "inline_outer(vals)") \
    .selectExpr(
        "id_num",
        "indexes_type",
        "version",
        "data_sufficiency_indicator",
        "value",
        "explode_outer(low_value_reason) as low_value_reason"
    )

df_new.show(truncate=False)
+------+---------------+-------+--------------------------+-----+----------------------------------+
|id_num|indexes_type   |version|data_sufficiency_indicator|value|low_value_reason                  |
+------+---------------+-------+--------------------------+-----+----------------------------------+
|1234  |visibility     |2.0    |true                      |2.16 |low scores from reviews_and_visits|
|1234  |visibility     |2.0    |true                      |2.16 |low scores from online_presence   |
|1234  |customer_rating|2.0    |false                     |null |null                              |
|1234  |reputation     |2.0    |false                     |null |null                              |
|5678  |visibility     |2.0    |true                      |2.71 |low scores from reviews_and_visits|
|5678  |visibility     |2.0    |true                      |2.71 |low scores from online_presence   |
|5678  |customer_rating|2.0    |false                     |null |null                              |
|9876  |visibility     |2.0    |true                      |3.06 |null                              |
|9876  |customer_rating|2.0    |false                     |null |null                              |
|9876  |reputation     |2.0    |false                     |null |null                              |
+------+---------------+-------+--------------------------+-----+----------------------------------+

顺便说一句。我在您的示例 JSON 中将 data_id 更改为 id_num,我认为这是您的错字。如果没有,只需将data_id string 添加到架构中,然后使用coalesce(id_num,data_id) 获取最终的id_num 列。

另一方面,您也可以尝试在加载数据帧后不指定架构后使用from_json/to_json函数,参见类似示例here

【讨论】:

  • @aj2713,您介意对我的回答有任何问题进行反馈吗?
猜你喜欢
  • 2021-05-24
  • 1970-01-01
  • 1970-01-01
  • 2020-11-19
  • 2020-12-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多