【发布时间】:2022-02-03 22:50:10
【问题描述】:
我有一个 pyspark 数据框,其中重要信息作为 json 字符串存储在一个列中,这些字符串具有相似但不一致的模式。我的问题引发了三个问题,如下所示:
- 要使用 json 字符串列展平数据框,是否应该创建一个新的结构列并使用
explode - UDF 是否将单个单元格值传递给它们包装的函数?
- 如何将不同长度的 json 数组和每个条目的字段加载到单个列中?
这发生在数据块和本地安装的 pyspark 中。
此表的 MWA 可以通过此代码生成:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
jstring_A = """[
{"a_id":"0001","a_s":"apple","score":1},
{"a_id":"0002","a_s":"banana","score":1},
{"a_id":"0003","a_s":"carrot","score":1}
]""".replace('\n','')
# This has 3 responses, each of the fields ["a_id", "a_s", "score"]
jstring_B = """[
{"a_id":"so1","a_R":"aardvark","score":5},
{"a_id":"so2","a_R":"baboon","score":9}
]""".replace('\n','')
# This has 2 responses, each of the fields ["a_id", "a_R", "score"]
data = [(1, jstring_A), (2, jstring_B)]
columns = ["_oid", "json_str"]
source_df = spark.createDataFrame(data=data, schema = columns)
生成这张表
+----+---------------------------------------------------------------------------------------------------------------------------+
|_oid|json_str |
+----+---------------------------------------------------------------------------------------------------------------------------+
|1 |[{"a_id":"0001","a_s":"apple","score":1},{"a_id":"0002","a_s":"banana","score":1},{"a_id":"0003","a_s":"carrot","score":1}]|
|2 |[{"a_id":"so1","a_R":"aardvark","score":5},{"a_id":"so2","a_R":"baboon","score":9}] |
+----+---------------------------------------------------------------------------------------------------------------------------+
我的最终目标是扁平化这个数据框。我对执行此操作的最佳方法的理解是将字符串转换为数据框中的struct,然后使用explode。这将创建 5 行(每个响应一个),每行包含 _oid, json_str, a_id, a_s, a_R, score 列。
这个过程正确吗?
为了扁平化,我找到了this excellent question,它提供了获取模式中所有字段名称的方法。 This question 解释说,任何缺少值的架构字段都将简单地加载为 Null。
这会产生以下代码
all_fields = spark.read.json(source_df.select("json_str").rdd.map(lambda x: x[0])).schema
和结果模式
StructType(List(StructField(a_R,StringType,true),StructField(a_id,StringType,true),StructField(a_s,StringType,true),StructField(score,LongType,true)))
此时,我们的问题发生了分歧,因为这里的每个 json 对象中的响应数量是可变的,因此我们不能简单地使用该模式。因为列的模式不是恒定的,所以我们不能传递from_json 列。但是,它可以传递一个字符串。
为此,我使用了.withColumn 和udf。
import json as pj
def create_struct(json_str):
n_responses = len(pj.loads(json_str))
schema = StructType(
[StructField(
name = f"{i}",
dataType= all_fields,
nullable= True
)
for i in range(n_responses)]
)
return(from_json(json_str,schema))
json_as_struct = udf(lambda z: create_struct(z))
new_df = source_df.withColumn("as_struct", json_as_struct(col("json_str")))
根据我对 UDF 的理解,Spark 将在此处分配迭代,并通过将 json_str 的相应值传递给 UDF 来逐个单元格地创建列 as_struct。 UDF 是这样工作的吗?
此代码的第一个错误迹象是new_df。在评估之前,它的架构是
DataFrame[_oid: bigint, json_str: string, as_struct: string]
正如人们所期望的那样,UDF 上的默认返回类型是 string。
当进行操作调用 (new_df.display()) 时,会发生真正的错误:
PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
完整的追溯如下。
这引出了我的最后一个问题: 我可以使用 UDF 将这些 json 字符串转换为结构吗?如果可以,我应该对我的代码进行哪些更改才能使其正常工作?
完整的追溯:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2033.0 failed 4 times, most recent failure: Lost task 2.3 in stage 2033.0 (TID 32875, 10.60.162.7, executor 167): org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 654, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 646, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
for obj in iterator:
File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
for item in iterator:
File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
return lambda *a: f(*a)
File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
return f(*args, **kwargs)
File "<command-124...>", line 19, in <lambda>
File "<command-124...>", line 17, in create_struct
File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2339)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:298)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:58)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2994)
at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:2985)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3709)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3707)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2984)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
at com.databricks.backend.daemon.driver.PythonDriverLocal.generateTableResult(PythonDriverLocal.scala:1158)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:1070)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:939)
at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:538)
at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:899)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$8(PythonDriverLocal.scala:384)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:857)
at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:371)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: 'AttributeError: 'NoneType' object has no attribute '_jvm'', from <command-124...>, line 17. Full traceback below:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 654, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 646, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 231, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/databricks/spark/python/pyspark/serializers.py", line 145, in dump_stream
for obj in iterator:
File "/databricks/spark/python/pyspark/serializers.py", line 220, in _batched
for item in iterator:
File "/databricks/spark/python/pyspark/worker.py", line 467, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/databricks/spark/python/pyspark/worker.py", line 467, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/databricks/spark/python/pyspark/worker.py", line 91, in <lambda>
return lambda *a: f(*a)
File "/databricks/spark/python/pyspark/util.py", line 109, in wrapper
return f(*args, **kwargs)
File "<command-124...>", line 19, in <lambda>
File "<command-124...>", line 17, in create_struct
File "/databricks/spark/python/pyspark/sql/functions.py", line 2412, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, _options_to_str(options))
AttributeError: 'NoneType' object has no attribute '_jvm'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:598)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:551)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
【问题讨论】:
标签: json pyspark apache-spark-sql databricks