【问题标题】:Creating a new column by reading json strings with inconsistent schema in pyspark通过读取 pyspark 中模式不一致的 json 字符串创建新列
【发布时间】:2022-02-03 22:50:10
【问题描述】:

我有一个 pyspark 数据框,其中重要信息作为 json 字符串存储在一个列中,这些字符串具有相似但不一致的模式。我的问题引发了三个问题,如下所示:

  1. 要使用 json 字符串列展平数据框,是否应该创建一个新的结构列并使用 explode
  2. UDF 是否将单个单元格值传递给它们包装的函数?
  3. 如何将不同长度的 json 数组和每个条目的字段加载到单个列中?

这发生在数据块和本地安装的 pyspark 中。

此表的 MWA 可以通过此代码生成:

from pyspark.sql.functions import from_json
from pyspark.sql.types import *

jstring_A = """[
{"a_id":"0001","a_s":"apple","score":1},
{"a_id":"0002","a_s":"banana","score":1},
{"a_id":"0003","a_s":"carrot","score":1}
]""".replace('\n','')
# This has 3 responses, each of the fields ["a_id", "a_s", "score"]

jstring_B = """[
{"a_id":"so1","a_R":"aardvark","score":5},
{"a_id":"so2","a_R":"baboon","score":9}
]""".replace('\n','')
# This has 2 responses, each of the fields ["a_id", "a_R", "score"]

data = [(1, jstring_A), (2, jstring_B)]
columns = ["_oid", "json_str"]
source_df = spark.createDataFrame(data=data, schema = columns)

生成这张表

+----+---------------------------------------------------------------------------------------------------------------------------+
|_oid|json_str                                                                                                                   |
+----+---------------------------------------------------------------------------------------------------------------------------+
|1   |[{"a_id":"0001","a_s":"apple","score":1},{"a_id":"0002","a_s":"banana","score":1},{"a_id":"0003","a_s":"carrot","score":1}]|
|2   |[{"a_id":"so1","a_R":"aardvark","score":5},{"a_id":"so2","a_R":"baboon","score":9}]                                        |
+----+---------------------------------------------------------------------------------------------------------------------------+

我的最终目标是扁平化这个数据框。我对执行此操作的最佳方法的理解是将字符串转换为数据框中的struct,然后使用explode。这将创建 5 行(每个响应一个),每行包含 _oid, json_str, a_id, a_s, a_R, score 列。

这个过程正确吗?

为了扁平化,我找到了this excellent question,它提供了获取模式中所有字段名称的方法。 This question 解释说,任何缺少值的架构字段都将简单地加载为 Null

这会产生以下代码

all_fields = spark.read.json(source_df.select("json_str").rdd.map(lambda x: x[0])).schema

和结果模式

StructType(List(StructField(a_R,StringType,true),StructField(a_id,StringType,true),StructField(a_s,StringType,true),StructField(score,LongType,true)))

此时,我们的问题发生了分歧,因为这里的每个 json 对象中的响应数量是可变的,因此我们不能简单地使用该模式。因为列的模式不是恒定的,所以我们不能传递from_json 列。但是,它可以传递一个字符串。

为此,我使用了.withColumn 和udf。

import json as pj

def create_struct(json_str):
  n_responses = len(pj.loads(json_str))
  schema = StructType(
              [StructField(
                            name = f"{i}",
                            dataType= all_fields,
                            nullable= True
                          )
               for i in range(n_responses)]
  )
  return(from_json(json_str,schema))
  
json_as_struct = udf(lambda z: create_struct(z))

new_df = source_df.withColumn("as_struct", json_as_struct(col("json_str")))

根据我对 UDF 的理解,Spark 将在此处分配迭代,并通过将 json_str 的相应值传递给 UDF 来逐个单元格地创建列 as_structUDF 是这样工作的吗?

此代码的第一个错误迹象是new_df。在评估之前,它的架构是

DataFrame[_oid: bigint, json_str: string, as_struct: string]

正如人们所期望的那样,UDF 上的默认返回类型是 string

当进行操作调用 (new_df.display()) 时,会发生真正的错误:

PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:

完整的追溯如下。

这引出了我的最后一个问题: 我可以使用 UDF 将这些 json 字符串转换为结构吗?如果可以,我应该对我的代码进行哪些更改才能使其正常工作?


完整的追溯:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2033.0 failed 4 times, most recent failure: Lost task 2.3 in stage 2033.0 (TID 32875, 10.60.162.7, executor 167): org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 654, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 646, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
    for item in iterator:
  File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
    return f(*args, **kwargs)
  File "<command-124...>", line 19, in <lambda>
  File "<command-124...>", line 17, in create_struct
  File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2339)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:298)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:58)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2994)
    at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:2985)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3709)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3707)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2984)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.generateTableResult(PythonDriverLocal.scala:1158)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:1070)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:939)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:538)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:899)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$8(PythonDriverLocal.scala:384)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:371)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 654, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 646, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
    for item in iterator:
  File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
    return f(*args, **kwargs)
  File "<command-124...>", line 19, in <lambda>
  File "<command-124...>", line 17, in create_struct
  File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

【问题讨论】:

    标签: json pyspark apache-spark-sql databricks


    【解决方案1】:

    一位资深开发人员为我提出了以下解决方案:

    1. 不,可以将regexp_replacefunctools.reduce结合使用,直接从json字符串中分解出来。
    2. 没有讨论
    3. 以下代码实现了预期的结果:
    def explode_col(source_df, colname):
      
      keys = spark.read.json(source_df.select(colname).rdd.map(lambda x: x[0])).columns
      
      source_df = source_df\
        .withColumn("json_trimmed", f.regexp_replace(f.col(colname),'\[|\]', ''))\
        .withColumn("json_trimmed", f.regexp_replace(f.col("json_trimmed"),'\"', ''))\
        .withColumn("json_rep", f.regexp_replace(f.col("json_trimmed"), '\},\{', '\}.\{'))\
        .withColumn("json_array", f.split(f.col("json_rep"), '\.'))\
        .withColumn('exploded', f.explode('json_array'))
      
      pat = "(?<=%s:)\w+(?=(,|}))"
      from functools import reduce
      exploded_col_df = reduce(
          lambda df, c: df.withColumn(
              c,
              f.when(
                  f.col("exploded").rlike(pat%c),
                  f.regexp_extract("exploded", pat%c, 0)
              )
          ),
          keys,
          source_df
      )
      return(exploded_col_df.drop(colname, "json_trimmed", "json_rep", "json_array", "exploded"))
                    
    explode_col(source_df, "json_str").show(truncate = False)
    

    【讨论】:

      猜你喜欢
      • 2020-12-09
      • 1970-01-01
      • 1970-01-01
      • 2018-03-06
      • 1970-01-01
      • 2023-02-01
      • 2022-06-10
      • 2017-04-27
      • 1970-01-01
      相关资源
      最近更新 更多