【问题标题】:Overwrite csv file on s3 fails in pyspark在 s3 上覆盖 csv 文件在 pyspark 中失败
【发布时间】:2019-10-26 23:49:09
【问题描述】:

当我从 s3 存储桶将数据加载到 pyspark 数据帧中时,然后进行一些操作(加入、联合),然后我尝试覆盖我之前读过的相同路径 ('data/csv/')。我收到此错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o4635.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in stage 120.0 failed 4 times, most recent failure: Lost task 200.3 in stage 120.0: java.io.FileNotFoundException: Key 'data/csv/part-00000-68ea927d-1451-4a84-acc7-b91e94d0c6a3-c000.csv' does not exist in S3
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
csv_a = spark \
    .read \
    .format('csv') \
    .option("header", "true") \
    .load('s3n://mybucket/data/csv') \
    .where('some condition')

csv_b = spark \
    .read \
    .format('csv') \
    .option("header", "true") \
    .load('s3n://mybucket/data/model/csv')
    .alias('csv')

# Reading glue categories data
cc = spark \
    .sql("select * from mydatabase.mytable where month='06'") \
    .alias('cc')

# Joining and Union
output = csv_b \
    .join(cc, (csv_b.key == cc.key), 'inner') \
    .select('csv.key', 'csv.created_ts', 'cc.name', 'csv.text') \
    .drop_duplicates(['key']) \
    .union(csv_a) \
    .orderBy('name') \
    .coalesce(1) \
    .write \
    .format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .save('s3n://mybucket/data/csv')

我需要从 s3 位置读取数据,然后加入、联合另一个数据,最后覆盖初始路径以仅保留一个包含干净连接数据的 csv 文件。

如果我尝试从另一个与我需要覆盖的不同的 s3 路径读取(加载)数据,它可以正常工作并覆盖。

任何想法为什么会发生此错误?

【问题讨论】:

    标签: python amazon-s3 pyspark amazon-emr


    【解决方案1】:

    当您从文件夹中读取数据、对其进行修改并保存在您最初读取的数据之上时,spark 会尝试覆盖 s3(hdfs 上的文件)等上的相同键...

    我找到了 2 个选项:

    1. 将数据保存到临时文件夹,然后再次读取
    2. 使用 df.persist() 转储到内存、磁盘或两者中

    通过添加解决 .persist(StorageLevel.MEMORY_AND_DISK)

    output = csv_b \
        .join(cc, (csv_b.key == cc.key), 'inner') \
        .select('csv.key', 'csv.created_ts', 'cc.name', 'csv.text') \
        .drop_duplicates(['key']) \
        .union(csv_a) \
        .orderBy('name') \
        .coalesce(1) \
        .persist(StorageLevel.MEMORY_AND_DISK) \
        .write \
        .format('csv') \
        .option('header', 'true') \
        .mode('overwrite') \
        .save('s3n://mybucket/data/csv')
    

    【讨论】:

      猜你喜欢
      • 2021-08-02
      • 1970-01-01
      • 2013-05-14
      • 2012-08-16
      • 2020-04-07
      • 2018-03-01
      • 2015-05-25
      • 2018-08-27
      • 2022-10-05
      相关资源
      最近更新 更多