【问题标题】:Dealing with non-uniform JSON columns in spark dataframe处理 spark 数据框中的非统一 JSON 列
【发布时间】:2020-01-29 23:31:09
【问题描述】:

我想知道将换行符分隔的 JSON 文件读入数据帧的最佳做法是什么。至关重要的是,每条记录中的一个(必需)字段映射到一个不能保证具有相同子字段的对象(即架构在所有记录中是不统一的)。

例如,输入文件可能如下所示:

{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}
{"id": 3, "type": "baz", "data": {"key3": "moo"}}

在这种情况下,idtypedata 字段将出现在所有记录中,但 data 映射到的结构将具有异构模式。

我有两种方法来处理data 列的不均匀性:

  1. 让 spark 推断架构:
df = spark.read.options(samplingRatio=1.0).json('s3://bucket/path/to/newline_separated_json.txt')

这种方法的明显问题是需要对每条记录进行采样以确定将成为最终模式的字段/模式的超集。考虑到数百万条记录的低 100 条数据集,这可能会非常昂贵?或者……

  1. 告诉 spark 将数据字段强制转换为 JSON 字符串,然后只拥有一个由三个顶级字符串字段组成的架构,idtypedata。在这里,我不确定最好的方法。例如,我假设只是将data 字段声明为如下所示的字符串,因为它没有明确地执行与json.dumps 等效的操作,所以它不起作用?
schema = StructType([
    StructField("id", StringType(), true),
    StructField("type", StringType(), true),
    StructField("data", StringType(), true)
])
df = spark.read.json('s3://bucket/path/to/newline_separated_json.txt', schema=schema)

如果我想避免由选项 1 产生的扫描完整数据集的成本,提取此文件并将 data 字段保留为 JSON 字符串的最佳方法是什么?

谢谢

【问题讨论】:

    标签: python json apache-spark pyspark


    【解决方案1】:

    我认为您的尝试和总体思路是正确的。以下是另外两种基于内置选项的方法,即 get_json_object/from_json 通过数据帧 API 和使用map 转换以及 python 的 json.dumps()json.loads() 通过 RDD API。

    选项 1: get_json_object() / from_json()

    首先让我们尝试使用不需要架构的get_json_object()

    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([
      ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
      ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
      ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
    ], StringType())
    
    df.select(f.get_json_object("value", "$.id").alias("id"), \
              f.get_json_object("value", "$.type").alias("type"), \
               f.get_json_object("value", "$.data").alias("data"))
    
    # +---+----+-----------------------------+
    # |id |type|data                         |
    # +---+----+-----------------------------+
    # |1  |foo |{"key0":"foo","key2":"meh"}  |
    # |2  |bar |{"key2":"poo","key3":"pants"}|
    # |3  |baz |{"key3":"moo"}               |
    # +---+----+-----------------------------+
    

    相反,from_json() 需要架构定义:

    from pyspark.sql.types import StringType, StructType, StructField
    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([
      ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
      ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
      ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
    ], StringType())
    
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("data", StringType(), True)
    ])
    
    df.select(f.from_json("value", schema).getItem("id").alias("id"), \
             f.from_json("value", schema).getItem("type").alias("type"), \
             f.from_json("value", schema).getItem("data").alias("data"))
    
    # +---+----+-----------------------------+
    # |id |type|data                         |
    # +---+----+-----------------------------+
    # |1  |foo |{"key0":"foo","key2":"meh"}  |
    # |2  |bar |{"key2":"poo","key3":"pants"}|
    # |3  |baz |{"key3":"moo"}               |
    # +---+----+-----------------------------+
    

    选项 2:map/RDD API + json.dumps()

    from pyspark.sql.types import StringType, StructType, StructField
    import json
    
    df = spark.createDataFrame([
      '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
      '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
      '{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
    ], StringType())
    
    def from_json(data):
      row = json.loads(data[0])
      return (row['id'], row['type'], json.dumps(row['data']))
    
    json_rdd = df.rdd.map(from_json)
    
    schema = StructType([
        StructField("id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("data", StringType(), True)
    ])
    
    spark.createDataFrame(json_rdd, schema).show(10, False)
    
    # +---+----+--------------------------------+
    # |id |type|data                            |
    # +---+----+--------------------------------+
    # |1  |foo |{"key2": "meh", "key0": "foo"}  |
    # |2  |bar |{"key2": "poo", "key3": "pants"}|
    # |3  |baz |{"key3": "moo"}                 |
    # +---+----+--------------------------------+
    
    

    函数from_json 会将字符串行转换为(id, type, data) 的元组。 json.loads() 将解析 json 字符串并返回一个字典,我们通过该字典生成并返回最终的元组。

    【讨论】:

    • 这看起来很有希望,我想在 rdd 上运行 map 仍然存在开销,但可能比在 samplingRatio=1.0 情况下检查模式的每条记录更便宜。谢谢。
    • 是的,这是真的,因为 map 是一个狭窄的转换,它根本不应该造成开销
    • 嗨@WhitneyZoller 我更新了答案,使用get_json_object(当你不知道架构时)或from_json添加了一个选项
    • 我相信这两种方法之间不应该有任何重要的区别,因为在这两种情况下,json行都需要被解析
    【解决方案2】:

    我建议查看 Rumble 以在 Spark 上查询不适合 DataFrames 的异构 JSON 数据集。这正是它解决的问题。它是免费和开源的。

    例如:

    for $i in json-file("s3://bucket/path/to/newline_separated_json.txt")
    where keys($i.data) = "key2" (: keeping only those objects that have a key2 :)
    group by $type := $i.type
    return {
      "type" : $type,
      "key2-values" : [ $i.data.key2 ]
    }
    

    (免责声明:我是团队的一员。)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-03-01
      • 1970-01-01
      • 2021-09-24
      • 1970-01-01
      • 1970-01-01
      • 2020-11-20
      • 2022-01-23
      相关资源
      最近更新 更多