【问题标题】:Parse a JSON column in a spark dataframe using Spark使用 Spark 解析 Spark 数据框中的 JSON 列
【发布时间】:2021-09-24 09:23:19
【问题描述】:

输入:

caseid object_value
1 [{'dummyAcc':'12346','accountRequest':{'schemeCode':'ZEROQ1', 'CCZ':'SGD'}}]
2 [{'dummyAcc':'12347','accountRequest':{'schemeCode':'ZEROQ2', 'CCZ':'SGD'}}]
3 [{'dummyAcc':'12348','accountRequest':{'schemeCode':'ZEROQ5', 'CCZ':'SGD'}}]
4 [{'dummyAcc':'12349','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]
5 [{'dummyAcc':'12350','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]

输出:

caseid schemeCode CCZ
1 ZEROQ1 SGD
2 ZEROQ2 SGD
3 ZEROQ5 SGD
4 ZEROQ SGD
5 ZEROQ SGD

请指导我在 spark 中实现此输出,我可以使用少量样本数据在 python 中执行此操作,但由于生产中的数据量需要在 spark 中执行此操作。 提前致谢

【问题讨论】:

  • 如果您在此处提供的示例能够代表您的用例,那么最快的方法是使用 PySpark 的原生 regexp_extract 工具。如果您的数据非常复杂,您可能需要编写一个 UDF,将它们转换为字典并进行搜索。
  • regexp_extract 是否适用于 JSON?
  • 当然。 JSON 只是字符串。下面是一个使用它来查找特定搜索词之后的第一个词的示例:stackoverflow.com/a/46547701/1884171
  • 问题是您的数据不是有效的 JSON。有效 JSON 使用双引号
  • @Steven,对不起,数据用双引号括起来。

标签: json dataframe apache-spark parsing pyspark


【解决方案1】:

所以一位同事曾经告诉我 regex_extract 比解析 JSON 更快,我一直相信...直到今天我决定进行一些计时实验,比较使用 @987654322 发布的其他两个解决方案@ 和from_json

简短的回答是,即使我们通过添加数千个额外的 K:V 对来使 JSON 复杂化,所有这些都具有可比性。在这些测试中,regex_extract 方法实际上一直慢一些。

设置:证明每种方法都有效

import pyspark.sql.functions as fun
import pyspark.sql.types as t

case_ids = range(1,6)
data =  [
  '{"dummyAcc":"12346","accountRequest":{"schemeCode":"ZEROQ1", "CCZ":"SGD"}}',
  '{"dummyAcc":"12347","accountRequest":{"schemeCode":"ZEROQ2", "CCZ":"SGD"}}',
  '{"dummyAcc":"12348","accountRequest":{"schemeCode":"ZEROQ5", "CCZ":"SGD"}}',
  '{"dummyAcc":"12349","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}',
  '{"dummyAcc":"12350","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}'
]

df = spark.createDataFrame(pd.DataFrame({"caseid": case_ids, "object_value": data}))

##
# fun.from_json
##
schm = t.StructType(
    [
        t.StructField("dummyAcc", t.StringType()),
        t.StructField(
            "accountRequest",
            t.StructType(
                [
                    t.StructField("schemeCode", t.StringType()),
                    t.StructField("CCZ", t.StringType()),
                ]
            ),
        ),
    ]
)

def run_from_json(df):
  return df.withColumn("object_value", fun.from_json("object_value", schm, options={"allowSingleQuotes": "true"}))\
          .select(
            "caseid",
            "object_value.accountRequest.schemeCode",
            "object_value.accountRequest.CCZ",
        )

##
# get_json
##

def run_get_json(df):
  return df.select('caseid', 
                    fun.get_json_object('object_value', '$.accountRequest.schemeCode').alias('schemeCode'),
                    fun.get_json_object('object_value', '$.accountRequest.CCZ').alias('CCZ'))


##
# regexp_extract
##

def run_regexp_extract(df):
  return df.withColumn("schemeCode", fun.regexp_extract(fun.col("object_value"), '(.)("schemeCode":")(\w+)', 3))\
    .withColumn("CCZ", fun.regexp_extract(fun.col("object_value"), '(.)("CCZ":")(\w+)', 3))\
    .select("caseid", "schemeCode", "CCZ")

##
# Test them out
##

print("from_json")
run_from_json(df).show(truncate=False)

print("get_json")
run_get_json(df).show(truncate=False)

print("regexp_extract")
run_regexp_extract(df).show(truncate=False)


from_json
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+

get_json
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+


regexp_extract
+------+----------+---+
|caseid|schemeCode|CCZ|
+------+----------+---+
|1     |ZEROQ1    |SGD|
|2     |ZEROQ2    |SGD|
|3     |ZEROQ5    |SGD|
|4     |ZEROQ     |SGD|
|5     |ZEROQ     |SGD|
+------+----------+---+

计时第 1 部分 -- 使用短 JSON 我使用上面定义的默认紧凑 JSON 检查了运行多次迭代的挂钟时间。

def time_run_method(df, n_it, meth, meth_name):
  t0 = time.time()
  for i in range(n_it):
    meth(df).count()
  td = time.time() - t0
  print(n)
  print("Time to count %d iterations: %s [sec]" % (n_it, "{:,}".format(td)))
  
for m, n in zip([run_from_json, run_get_json, run_regexp_extract], ["from_json", "get_json", "regexp_extract"]):
  time_run_method(df, 200, m, n)


from_json
Time to count 200 iterations: 15.918861389160156 [sec]

get_json
Time to count 200 iterations: 15.668830871582031 [sec]

regexp_extract
Time to count 200 iterations: 17.539576292037964 [sec]

计时第 2 部分 -- 使用长 JSON 我向 JSON 添加了 2000 个键值对,以查看反序列化它们的额外开销是否会改变事情。它没。也许这个结构太简单了,内部解析器能够简单地避免额外的键,或者考虑到结构的平坦度,它们不会产生很多开销。我不知道。

cruft = json.dumps({k:v for k,v in enumerate(range(2000))})

data = [
  '{ "cruft": %s, "dummyAcc":"12346","accountRequest":{"schemeCode":"ZEROQ1", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12347","accountRequest":{"schemeCode":"ZEROQ2", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12348","accountRequest":{"schemeCode":"ZEROQ5", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12349","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}' % cruft,
  '{ "cruft": %s, "dummyAcc":"12350","accountRequest":{"schemeCode":"ZEROQ", "CCZ":"SGD"}}' % cruft
]

df2 = spark.createDataFrame(pd.DataFrame({"caseid": case_ids, "object_value": data}))

for m, n in zip([run_from_json, run_get_json, run_regexp_extract], ["from_json", "get_json", "regexp_extract"]):
  time_run_method(df2, 200, m, n)


    
from_json
Time to count 200 iterations: 16.005220413208008 [sec]
get_json
Time to count 200 iterations: 15.788024187088013 [sec]
regexp_extract
Time to count 200 iterations: 16.81353187561035 [sec]

【讨论】:

  • 惊人的答案,我从未停止思考哪个性能最好
  • 培养一些好奇心只需要缓慢的一天 :-)
  • 嗨@andrew,非常感谢你的精彩解释,今天学到了一些东西。我将使用 regex_extract,因为有 500 k:V,我只是拿了一个样本并发布在这里。
  • 1.有 2 个问题 regex_extract 函数末尾的 3 是什么? 2.如果scheme_code是"schemeCode":"ZEROQ1 account (ABCD)" 正则表达式会如何变化?
  • 3 是组号。 regexp_extract 捕获正则表达式中指定的所有组。在这种情况下,我们希望捕获第三组。
【解决方案2】:

要提取类似 json 的数据,请使用函数 from_json。 它需要一个模式作为输入。而且您的 JSON 格式错误,因此,您需要添加选项 {"allowSingleQuotes": "true"}

from pyspark.sql import functions as F, types as T

schm = T.StructType(
    [
        T.StructField("dummyAcc", T.StringType()),
        T.StructField(
            "accountRequest",
            T.StructType(
                [
                    T.StructField("schemeCode", T.StringType()),
                    T.StructField("CCZ", T.StringType()),
                ]
            ),
        ),
    ]
)

df.withColumn(
    "object_value",
    F.from_json("object_value", schm, options={"allowSingleQuotes": "true"}),
).select(
    "caseid",
    "object_value.accountRequest.schemeCode",
    "object_value.accountRequest.CCZ",
).show()

+------+----------+---+                                                         
|caseid|schemeCode|CCZ|
+------+----------+---+
|     1|    ZEROQ1|SGD|
|     2|    ZEROQ2|SGD|
|     3|    ZEROQ5|SGD|
|     4|     ZEROQ|SGD|
|     5|     ZEROQ|SGD|
+------+----------+---+

【讨论】:

    【解决方案3】:

    你可以使用get_json_object,它很简单

    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([
      [1, """[{'dummyAcc':'12346','accountRequest':{'schemeCode':'ZEROQ1', 'CCZ':'SGD'}}]"""],
      [2, """[{'dummyAcc':'12347','accountRequest':{'schemeCode':'ZEROQ2', 'CCZ':'SGD'}}]"""],
      [3, """[{'dummyAcc':'12348','accountRequest':{'schemeCode':'ZEROQ5', 'CCZ':'SGD'}}]"""],
      [4, """[{'dummyAcc':'12349','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]"""],
      [5, """[{'dummyAcc':'12350','accountRequest':{'schemeCode':'ZEROQ', 'CCZ':'SGD'}}]"""]
    ], schema='caseid int, object_value string')
    
    final_df = (df
                .select('caseid', 
                        f.get_json_object('object_value', '$[*].accountRequest.schemeCode').alias('schemeCode'),
                        f.get_json_object('object_value', '$[*].accountRequest.CCZ').alias('CCZ')))
    
    final_df.show(truncate=False)
    # +------+----------+-----+
    # |caseid|schemeCode|CCZ  |
    # +------+----------+-----+
    # |1     |"ZEROQ1"  |"SGD"|
    # |2     |"ZEROQ2"  |"SGD"|
    # |3     |"ZEROQ5"  |"SGD"|
    # |4     |"ZEROQ"   |"SGD"|
    # |5     |"ZEROQ"   |"SGD"|
    # +------+----------+-----+
    

    【讨论】:

      猜你喜欢
      • 2021-11-03
      • 2020-08-23
      • 2017-07-23
      • 1970-01-01
      • 2020-01-18
      • 1970-01-01
      • 2017-05-23
      • 1970-01-01
      • 2020-01-19
      相关资源
      最近更新 更多