【问题标题】:Creating hierarchical JSON in Spark在 Spark 中创建分层 JSON
【发布时间】:2020-08-03 16:44:16
【问题描述】:

我有一个 spark 数据框,我需要将其写入 MongoDB。我想知道如何在 mongoDB 中将数据框的某些列写为嵌套/分层 JSON。 假设数据框有 6 列,col1,col2,.....col5,col6 我希望 col1、col2、col3 作为第一个层次结构,其余列 col4 到 col6 作为第二个层次结构。像这样的,

{
    "col1": 123,
    "col2": "abc",
    "col3": 45,
    "fields": {
        "col4": "ert",
        "col5": 45,
        "col6": 56
    }
}

如何在 pyspark 中实现这一点?

【问题讨论】:

    标签: mongodb apache-spark pyspark


    【解决方案1】:

    在这种情况下,在内置函数中使用 to_json + struct

    Example:

    df.show()                                                                                                         
    #+----+----+----+----+----+----+
    #|col1|col2|col3|col4|col5|col6|
    #+----+----+----+----+----+----+
    #| 123| abc|  45| ert|  45|  56|
    #+----+----+----+----+----+----+
    
    from pyspark.sql.functions import *
    df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).show(10,False)
    #+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
    #|col1|col2|col3|col4|col5|col6|jsn                                                                                    |
    #+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
    #|123 |abc |45  |ert |45  |56  |{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
    #+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
    
    cols=df.columns
    
    df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).drop(*cols).show(10,False)
    #+---------------------------------------------------------------------------------------+
    #|jsn                                                                                    |
    #+---------------------------------------------------------------------------------------+
    #|{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
    #+---------------------------------------------------------------------------------------+
    
    #using toJSON
    df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).toJSON().collect()
    #[u'{"jsn":{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}}']
    
    #to write as json file
    df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).\
    drop(*cols).\
    write.\
    format("json").\
    save("<path>")
    

    Update:

    jsn列 表示为json struct

    df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).printSchema()
    #root
    # |-- jsn: struct (nullable = false)
    # |    |-- col1: string (nullable = true)
    # |    |-- col2: string (nullable = true)
    # |    |-- col3: string (nullable = true)
    # |    |-- fields: struct (nullable = false)
    # |    |    |-- col4: string (nullable = true)
    # |    |    |-- col5: string (nullable = true)
    # |    |    |-- col6: string (nullable = true)
    

    【讨论】:

    • 我试过了,但是当它插入 MongoDB 时,它会将第二个层次结构转换为字符串而不是 JSON 格式。
    • @Jawar,检查我的更新答案,并且不使用 to_json 函数,我们可以将jsn 列表示为 json 结构并尝试加载到 MongoDB。跨度>
    • drop(*cols) 是做什么的?
    • @Jawar,从数据框中删除 cols 列表,而不是提及每个列名。 (我们正在创建cols=df.columns
    猜你喜欢
    • 1970-01-01
    • 2021-04-27
    • 2017-10-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-31
    • 1970-01-01
    相关资源
    最近更新 更多