【问题标题】:How to iteratively explode a nested json with index using posexplode_outer如何使用poseexplode_outer迭代地分解带有索引的嵌套json
【发布时间】:2020-07-06 20:52:45
【问题描述】:

我有一个嵌套的 json,我需要使用posexplode_outer 函数来分解它

def flatten_df(nested_df): 
    for column in nested_df.columns:
        array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for column in array_cols:
        nested_df=nested_df.select('*',F.posexplode_outer(nested_df["`"+column+"`"]).alias("position",column))
    
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    
    if len(nested_cols) == 0:
        return nested_df
    
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    
    flat_df = nested_df.select(flat_cols +
                            [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    
    return flatten_df(flat_df)

我收到不明确的列名错误

遇到错误:“参考 不明确,可以 是:, .;"

谁能帮我解决这个错误。该函数的输入是如下所示的 JSON,它存储在 Spark 数据帧中。 JSON 是高度嵌套的。

{
        "template": 
            {
                "id": "9",
                "header": [
                    {
                        "id": "header",
                        "value": "Find the Right Marker for the Job"
                    },
                    {
                        "id": "section1-header",
                        "value": "Desk-Style Dry Erase Markers"
                    },
                    {
                        "id": "section2-header",
                        "value": "Pen-Style Dry Erase Markers"
                    },
                    {
                        "id": "section3-header",
                        "value": "Jumbo Washable Markers"
                    }
                ],
                "paragraph": [
                    {
                        "id": "description1",
                        "value": ["Desk-style wipe off easily "] 
                    },
                    {
                        "id": "description2",
                        "value": ["Pen-style "]
                    },
                    {
                        "id": "description3",
                        "value": ["banners."] 
                    },
                    {
                        "id": "description4",
                        "value": ["posters"] 
                    }
                ],
                "image": [
                    {
                        "id": "section1-image",
                        "assetId": "S"
                    },
                    {
                        "id": "section2-image",
                        "assetId": "A" 
                    },
                    {
                        "id": "section3-image",
                        "assetId": "34"
                    },
                    {
                        "id": "section4-image",
                        "assetId": "36"
                    }
                ]
            }
            })

上述示例 JSON 的预期输出是

【问题讨论】:

  • 您能否添加示例输入数据集和预期输出。
  • 用示例输入和预期输出编辑了问题

标签: python json pyspark apache-spark-sql


【解决方案1】:

您的架构可能包含在展平后名称重复的列。 尝试在扁平化的 df 上调用打印模式。

当数据框中的列名不唯一并且分析器无法确定您引用的是哪一列时,就会出现该错误消息。

在这种情况下,您要么需要解析生成数据框的方法,要么需要按位置访问列并为它们取别名,以便它们具有唯一的名称。

【讨论】:

  • 感谢@milos。相同的输入文件适用于“explode_outer”函数。但它不适用于poseexplode_outer。我不知道poseexplode_outer和explode_outer有什么不同
【解决方案2】:

下面是我尝试展平嵌套的 DF,也许这有帮助 -

用 scala 编写,但可以在 pyspark 中实现,改动很小

加载提供的测试数据

 val data =
      """
        |{
        |   "template": {
        |       "id": "9",
        |       "header": [{
        |               "id": "header",
        |               "value": "Find the Right Marker for the Job"
        |           },
        |           {
        |               "id": "section1-header",
        |               "value": "Desk-Style Dry Erase Markers"
        |           },
        |           {
        |               "id": "section2-header",
        |               "value": "Pen-Style Dry Erase Markers"
        |           },
        |           {
        |               "id": "section3-header",
        |               "value": "Jumbo Washable Markers"
        |           }
        |       ],
        |       "paragraph": [{
        |               "id": "description1",
        |               "value": ["Desk-style wipe off easily "]
        |           },
        |           {
        |               "id": "description2",
        |               "value": ["Pen-style "]
        |           },
        |           {
        |               "id": "description3",
        |               "value": ["banners."]
        |           },
        |           {
        |               "id": "description4",
        |               "value": ["posters"]
        |           }
        |       ],
        |       "image": [{
        |               "id": "section1-image",
        |               "assetId": "S"
        |           },
        |           {
        |               "id": "section2-image",
        |               "assetId": "A"
        |           },
        |           {
        |               "id": "section3-image",
        |               "assetId": "34"
        |           },
        |           {
        |               "id": "section4-image",
        |               "assetId": "36"
        |           }
        |       ]
        |   }
        |}
      """.stripMargin
    val df = spark.read.option("multiLine", true)
      .json(Seq(data).toDS())

    df.show(false)
    df.printSchema()
    /**
      * +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |template                                                                                                                                                                                                                                                                                                                                                                                                                    |
      * +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |[[[header, Find the Right Marker for the Job], [section1-header, Desk-Style Dry Erase Markers], [section2-header, Pen-Style Dry Erase Markers], [section3-header, Jumbo Washable Markers]], 9, [[S, section1-image], [A, section2-image], [34, section3-image], [36, section4-image]], [[description1, [Desk-style wipe off easily ]], [description2, [Pen-style ]], [description3, [banners.]], [description4, [posters]]]]|
      * +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- template: struct (nullable = true)
      * |    |-- header: array (nullable = true)
      * |    |    |-- element: struct (containsNull = true)
      * |    |    |    |-- id: string (nullable = true)
      * |    |    |    |-- value: string (nullable = true)
      * |    |-- id: string (nullable = true)
      * |    |-- image: array (nullable = true)
      * |    |    |-- element: struct (containsNull = true)
      * |    |    |    |-- assetId: string (nullable = true)
      * |    |    |    |-- id: string (nullable = true)
      * |    |-- paragraph: array (nullable = true)
      * |    |    |-- element: struct (containsNull = true)
      * |    |    |    |-- id: string (nullable = true)
      * |    |    |    |-- value: array (nullable = true)
      * |    |    |    |    |-- element: string (containsNull = true)
      */

重命名结构 col 并展平架构

    val p = df.withColumn("template", col("template")
      .cast(
        """
          |struct<
          |header:array<struct<header_id:string, header_value:string>>,
          |id:string,
          |image:array<struct<image_value:string, image_id:string>>,
          |paragraph:array<struct<paragraph_id:string, paragraph_value:array<string>>>
          |>
        """.stripMargin.replaceAll("\n", "")))
      .selectExpr("template.*")
      .withColumn("paragraph", expr("TRANSFORM(paragraph, x -> named_struct('paragraph_id', x.paragraph_id, " +
        "'paragraph_value', x.paragraph_value[0]))"))

    p.show(false)
    p.printSchema()

    /**
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
      * |header                                                                                                                                                                                   |id |image                                                                                 |paragraph                                                                                                                   |
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
      * |[[header, Find the Right Marker for the Job], [section1-header, Desk-Style Dry Erase Markers], [section2-header, Pen-Style Dry Erase Markers], [section3-header, Jumbo Washable Markers]]|9  |[[S, section1-image], [A, section2-image], [34, section3-image], [36, section4-image]]|[[description1, Desk-style wipe off easily ], [description2, Pen-style ], [description3, banners.], [description4, posters]]|
      * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- header: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- header_id: string (nullable = true)
      * |    |    |-- header_value: string (nullable = true)
      * |-- id: string (nullable = true)
      * |-- image: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- image_value: string (nullable = true)
      * |    |    |-- image_id: string (nullable = true)
      * |-- paragraph: array (nullable = true)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- paragraph_id: string (nullable = true)
      * |    |    |-- paragraph_value: string (nullable = true)
      */

posexplode每个数组作为不同的数据框,然后使用position列加入

    val p1 = p.select($"id", posexplode_outer($"header")).selectExpr("col.*", "pos as position", "id")
    val p2 = p.select($"id", posexplode_outer($"image")).selectExpr("col.*", "pos as position")
    val p3 = p.select($"id", posexplode_outer($"paragraph")).selectExpr("col.*", "pos as position")

    p1.join(p2, "position")
        .join(p3, "position")
        .show(false)

    /**
      * +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
      * |position|header_id      |header_value                     |id |image_value|image_id      |paragraph_id|paragraph_value            |
      * +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
      * |2       |section2-header|Pen-Style Dry Erase Markers      |9  |34         |section3-image|description3|banners.                   |
      * |0       |header         |Find the Right Marker for the Job|9  |S          |section1-image|description1|Desk-style wipe off easily |
      * |1       |section1-header|Desk-Style Dry Erase Markers     |9  |A          |section2-image|description2|Pen-style                  |
      * |3       |section3-header|Jumbo Washable Markers           |9  |36         |section4-image|description4|posters                    |
      * +--------+---------------+---------------------------------+---+-----------+--------------+------------+---------------------------+
      */

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-27
    相关资源
    最近更新 更多