【问题标题】:PySpark - StructType can not accept object 'string indices must be integers' in type <class 'str'> PySparkPySpark - StructType 不能接受类型 <class 'str'> PySpark 中的对象“字符串索引必须是整数”
【发布时间】:2021-06-27 00:43:21
【问题描述】:

当试图在 map 函数中以字符串格式访问 json 时,抛出 TypeError

TypeError: StructType 不能接受类型

中的对象'字符串索引必须是整数'

我在 Stackoverflow 上尝试过很多帖子,例如 Dealing with non-uniform JSON columns in spark dataframe 没有一个有效。

基本上,当访问 json 响应字符串时,它会抛出异常。

Input JSON records
{
  "testPolicy": "personal",
  "testPolicyVersion": "00",
  "creditModel": "",
  "personalRequest": {
    "testContract": {
      "requestType": "PRE-SCREEN",
      "transactionDateTimeStamp": "2020-11-24:T08:53:27",
      "transactionID": "e04bef99-37d8-4a2b-a588-b6d019ecb157",
      "testData": [
        {
          "DataHeaders": "a|b|c",
          "DataValues": "1|2|3",
          "Bureau": "TEST"
        }
      ]
    }
  }
}


from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType


spark = SparkSession.builder.getOrCreate()

df = spark.read.format('csv').options(header='true', inferSchema='true').load("s3://bucket/response-pass1.csv")
df.show(1)
df.printSchema()


testresponse = StructType([
    StructField('scoreVal', StringType(), True)

])

def processTestScore(row):
    odp_policy_res = row.json_response
    vals = ""
    try:
        if(odp_policy_res != "HTTP error500" ):
            resp = json.loads(odp_policy_res)
            vals = odp_policy_res['personalRequest']['testContract']['testData'][0]['DataValues']
        else:
            vals = odp_policy_res
    except Exception as e:
        vals = str(e)
    return Row(vals)
    

rdd_1 = df.rdd.map(lambda x : processTestScore(x))
df2 = spark.createDataFrame(data=rdd_1, schema = test_response )
df2.show()



This is throwing exceptions as below, 


+--------------------+--------------------+--------------------+
|                  id|    json_request|   json__response|
+--------------------+--------------------+--------------------+
|fcc5487a-ad24-49d...|{"testPolicy": ...|{"testPolicy": ...|
+--------------------+--------------------+--------------------+
only showing top 1 row

root
 |-- id: string (nullable = true)
 |-- json_request: string (nullable = true)
 |-- json_response: string (nullable = true)

Py4JJavaError: An error occurred while calling o87644.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 662.0 failed 4 times, most recent failure: Lost task 0.3 in stage 662.0 (TID 21480, 10.87.12.222, executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 730, in prepare
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1389, in verify
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1377, in verify_struct
TypeError: StructType can not accept object 'string indices must be integers' in type <class 'str'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:859)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:859)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:347)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:347)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:409)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1477)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2191)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2190)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2190)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1058)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1058)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1058)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2430)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2373)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2362)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:824)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2379)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2548)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2548)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:79)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2548)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at sun.reflect.GeneratedMethodAccessor168.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 730, in prepare
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1389, in verify
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1377, in verify_struct
TypeError: StructType can not accept object 'string indices must be integers' in type <class 'str'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    a

【问题讨论】:

    标签: json apache-spark pyspark


    【解决方案1】:

    我在访问字符串值时犯了一个错误[假设它是 dict,因为我在 before 语句中对其进行了转换]

    我必须是

     resp = json.loads(odp_policy_res)
     vals = resp['personalRequest']['testContract']['testData'][0]['DataValues']
    

    代替

    
    resp = json.loads(odp_policy_res)
    vals = odp_policy_res['personalRequest']['testContract']['testData'][0]['DataValues']
    
    

    这解决了问题。

    def processTestScore(row):
        odp_policy_res = row.json_response
        vals = ""
        try:
            if(odp_policy_res != "HTTP error500" ):
                resp = json.loads(odp_policy_res)
                vals = resp['personalRequest']['testContract']['testData'][0]['DataValues']
            else:
                vals = odp_policy_res
        except Exception as e:
            vals = str(e)
        return Row(vals)
    

    【讨论】:

      猜你喜欢
      • 2020-10-30
      • 1970-01-01
      • 1970-01-01
      • 2017-08-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-09-04
      • 1970-01-01
      相关资源
      最近更新 更多