【发布时间】:2019-10-26 23:49:09
【问题描述】:
当我从 s3 存储桶将数据加载到 pyspark 数据帧中时,然后进行一些操作(加入、联合),然后我尝试覆盖我之前读过的相同路径 ('data/csv/')。我收到此错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o4635.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in stage 120.0 failed 4 times, most recent failure: Lost task 200.3 in stage 120.0: java.io.FileNotFoundException: Key 'data/csv/part-00000-68ea927d-1451-4a84-acc7-b91e94d0c6a3-c000.csv' does not exist in S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
csv_a = spark \
.read \
.format('csv') \
.option("header", "true") \
.load('s3n://mybucket/data/csv') \
.where('some condition')
csv_b = spark \
.read \
.format('csv') \
.option("header", "true") \
.load('s3n://mybucket/data/model/csv')
.alias('csv')
# Reading glue categories data
cc = spark \
.sql("select * from mydatabase.mytable where month='06'") \
.alias('cc')
# Joining and Union
output = csv_b \
.join(cc, (csv_b.key == cc.key), 'inner') \
.select('csv.key', 'csv.created_ts', 'cc.name', 'csv.text') \
.drop_duplicates(['key']) \
.union(csv_a) \
.orderBy('name') \
.coalesce(1) \
.write \
.format('csv') \
.option('header', 'true') \
.mode('overwrite') \
.save('s3n://mybucket/data/csv')
我需要从 s3 位置读取数据,然后加入、联合另一个数据,最后覆盖初始路径以仅保留一个包含干净连接数据的 csv 文件。
如果我尝试从另一个与我需要覆盖的不同的 s3 路径读取(加载)数据,它可以正常工作并覆盖。
任何想法为什么会发生此错误?
【问题讨论】:
标签: python amazon-s3 pyspark amazon-emr