【问题标题】:Convert spark dataframe to nested JSON using pyspark使用 pyspark 将 spark 数据帧转换为嵌套 JSON
【发布时间】:2021-12-10 08:59:13
【问题描述】:

我正在尝试将 spark 数据帧转换为 JSON。该数据框中大约有 100 万行,示例代码如下,但性能非常糟糕。所需的输出将是一个 member_id 在 JSON 文件中显示一次,对于一个 member_id 下的 tag_name 相同。请让我知道是否有任何可能的方法可以更快地做到这一点。

示例代码:

iresult = sdf.groupBy('member_id','tag_name').agg(ch.collect_list(ch.struct('detail_name','detail_value')).alias('detail')).\

groupBy('member_id').agg(ch.collect_list(ch.struct('tag_name','detail')).alias('tag'))\

.agg(ch.to_json(ch.collect_list(ch.struct('member_id','tag'))).alias('result'))

result.show()

detail.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, Service_A, 20
abc123, m1, Service_B, 20
abc123, m2, Service_C, 10
xyz456, m3, Service A, 5
xyz456, m3, Service A, 10

所需的输出 JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "20"},
                    {   "detail_name": "Service_B",
                        "detail_value": "20"}]},
            {"tag_name": "m2",
            "detail":[{ "detail_name": "Service_C",
                        "detail_value": "10"}]}]},
{   "member_id": "xyz456",
    "tag":[{"tag_name": "m3",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "5"},
                      { "detail_name": "Service_A",
                        "detail_value": "10"}]}]}

复制.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, problem_no, 'abc123xyz'
abc123, m1, problem_no, 'abc456zzz'
xyz456, m1, problem_no, 'abc123xyz'
xyz456, m1, problem_no, 'abc456zzz'

重复输出 JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]},
{   "member_id": "xyz456",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]}

【问题讨论】:

    标签: json pyspark apache-spark-sql


    【解决方案1】:

    介意通过sql语句实现吗?

    逐层构造struct,最后使用to_json函数生成json字符串。

    df.createOrReplaceTempView('tmp')
    sql = """
        select to_json(collect_list(struct(member_id,tag))) as member
        from
            (select member_id,collect_list(struct(tag_name,detail)) as tag
            from
                (select member_id,tag_name,collect_list(struct(detail_name,detail_value)) as detail
                from tmp
                group by member_id,tag_name)
            group by member_id)
    """
    df = spark.sql(sql)
    df.show(truncate=False)
    

    【讨论】:

    • 总体上看起来不错,但存在重复问题,我无法找出根本原因。我更新了示例数据框和上面的重复结果。
    • 我的测试结果是正确的,没有重复数据。
    猜你喜欢
    • 2021-04-13
    • 2020-08-24
    • 1970-01-01
    • 2020-04-02
    • 2018-12-20
    • 2021-07-02
    • 2021-06-12
    • 1970-01-01
    • 2020-07-31
    相关资源
    最近更新 更多