【问题标题】:parsing a JSON string Pyspark dataframe column that has string of array in one of the columns解析 JSON 字符串 Pyspark 数据框列,其中一列中有数组字符串
【发布时间】:2019-10-17 14:22:15
【问题描述】:

我正在尝试读取 JSON 文件并解析“jsonString”以及将数组包含到 pyspark 数据帧中的基础字段。

这是json文件的内容。

[{"jsonString": "{\"uid\":\"value1\",\"adUsername\":\"value3\",\"courseCertifications\":[{\"uid\":\"value2\",\"courseType\":\"TRAINING\"},{\"uid\":\"TEST\",\"courseType\":\"TRAINING\"}],\"modifiedBy\":\"value4\"}","transactionId": "value5", "tableName": "X"},
 {"jsonString": "{\"uid\":\"value11\",\"adUsername\":\"value13\",\"modifiedBy\":\"value14\"}","transactionId": "value15", "tableName": "X1"},
 {"jsonString": "{\"uid\":\"value21\",\"adUsername\":\"value23\",\"modifiedBy\":\"value24\"}","transactionId": "value25", "tableName": "X2"}]

我能够解析字符串“jsonString”的内容并使用以下逻辑选择所需的列

df = spark.read.json('path.json',multiLine=True)
df = df.withColumn('courseCertifications', explode(array(get_json_object(df['jsonString'],'$.courseCertifications'))))

现在我的最终目标是从“courseCertifications”解析字段“courseType”并为每个实例创建一行。

我正在使用以下逻辑来获取“courseType”

df = df.withColumn('new',get_json_object(df.courseCertifications, '$[*].courseType'))

我能够获取“courseType”的内容,但作为字符串,如下所示

[Row(new=u'["TRAINING","TRAINING"]')]

我的最终目标是创建一个包含 transactionId、jsonString.uid、jsonString.adUsername、jsonString.courseCertifications.uid、jsonString.courseCertifications.courseType 列的数据框

  • 我需要保留所有行并为每个 courseCertifications.uid/courseCertifications.courseType 的数组实例创建多行。

【问题讨论】:

    标签: arrays json apache-spark pyspark apache-spark-sql


    【解决方案1】:

    解决您的问题的一种优雅方式是创建 json 字符串的架构,然后使用 from_json 函数对其进行解析

    import pyspark.sql.functions as f
    from pyspark.shell import spark
    from pyspark.sql.types import ArrayType, StringType, StructType, StructField
    
    df = spark.read.json('your_path', multiLine=True)
    schema = StructType([
        StructField('uid', StringType()),
        StructField('adUsername', StringType()),
        StructField('modifiedBy', StringType()),
        StructField('courseCertifications', ArrayType(
            StructType([
                StructField('uid', StringType()),
                StructField('courseType', StringType())
            ])
        ))
    ])
    
    df = df \
        .withColumn('tmp', f.from_json(df.jsonString, schema)) \
        .withColumn('adUsername', f.col('tmp').adUsername) \
        .withColumn('uid', f.col('tmp').uid) \
        .withColumn('modifiedBy', f.col('tmp').modifiedBy) \
        .withColumn('tmp', f.explode(f.col('tmp').courseCertifications)) \
        .withColumn('course_uid', f.col('tmp').uid) \
        .withColumn('course_type', f.col('tmp').courseType) \
        .drop('jsonString', 'tmp')
    df.show()
    

    输出:

    +-------------+------+----------+----------+----------+-----------+
    |transactionId|uid   |adUsername|modifiedBy|course_uid|course_type|
    +-------------+------+----------+----------+----------+-----------+
    |value5       |value1|value3    |value4    |value2    |TRAINING   |
    |value5       |value1|value3    |value4    |TEST      |TRAINING   |
    +-------------+------+----------+----------+----------+-----------+
    

    【讨论】:

    • 谢谢@Kafels!这有效,而不是自定义模式,我正在执行以下操作来获取“jsonString”的结构,因为它可能包含不同的列,具体取决于 JSON 文件。 json_schema = spark.read.json(df.rdd.map(lambda row: row.jsonString)).schema df = df.withColumn('jsonString', from_json(df['jsonString'], json_schema)) 请分享你的想法。
    • 您的建议代码非常好,因为不需要映射架构总是有一个新的或从 JSON 文件中删除的列
    • 感谢@Kafels,我注意到只有具有“courseCertifications”字段的记录被保留,所有其他记录都被删除。如果任何行不存在“courseCertifications”字段,我的要求是获取所有记录并填充 NULL。
    • 在您的问题中编辑 JSON 并添加更多值进行调试
    • 更新了更多值的问题,并添加了更多关于 o/p Dataframe 的信息。
    猜你喜欢
    • 2017-04-27
    • 2020-12-28
    • 2018-05-17
    • 2023-03-30
    • 2018-02-20
    • 1970-01-01
    • 2022-06-14
    • 1970-01-01
    • 2016-05-11
    相关资源
    最近更新 更多