【问题标题】:How to overwrite a parquet file from where DataFrame is being read in Spark如何覆盖在 Spark 中读取 DataFrame 的镶木地板文件
【发布时间】:2020-01-20 14:42:30
【问题描述】:

这是我面临的问题的一个缩影,我遇到了错误。让我尝试在这里重现它。

我将DataFrame 保存为parquet,但是当我从parquet 文件重新加载DataFrame 并再次将其另存为parquet 时,出现错误。

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

错误信息 -

执行程序 22): java.io.FileNotFoundException: 请求的文件 maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet 不存在。基础文件可能已更新。 您可以通过运行“REFRESH”显式地使 Spark 中的缓存无效 SQL 中的 TABLE tableName' 命令或通过重新创建 Dataset/DataFrame 参与。

另一个 SO question 解决了这个问题。建议的解决方案是refresh 表格,如下面的代码,但这没有帮助。问题在于元数据的刷新。不知道怎么刷新。

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

此问题的解决方法: 解决此问题的一种不太优雅的方法是将DataFrame 保存为具有不同名称的parquet 文件,然后删除原始parquet 文件并最后,将此parquet 文件重命名为旧名称。

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')

但是,问题是某些 DataFrame 非常大,这可能不是处理它的最佳方法。更不用说重命名是否会导致元数据出现问题,我不确定。

【问题讨论】:

标签: python apache-spark metadata parquet


【解决方案1】:

解决此错误的一种方法是缓存,对 df 执行操作(例如:df.show()),然后以“覆盖”模式保存 parquet 文件。

在python中:

save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df which is new_df

new_df.cache()
new_df.show()

new_df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")

【讨论】:

    【解决方案2】:

    当从缓存中取出数据时,它似乎工作正常。

    val df = spark.read.format("parquet").load("temp").cache()
    

    cache 是惰性操作,不会触发任何计算,我们必须添加一些虚拟操作。

    println(df.count()) //count over parquet files should be very fast  
    

    现在它应该可以工作了:

    df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
    

    【讨论】:

    • 我用.cache() 运行了代码。有时它可以工作,有时它会因错误而失败 - Caused by: java.io.IOException: Error: Stale file handle。但是,我认为这与您的代码无关,因为我之前在保存 DF 时看到过这个错误。在我接受这个作为答案之前,让我测试一下它的形式。谢谢。
    • 你能解释一下cache() 有什么不同吗?
    • 好吧,我不能 100% 确定它可以在大型集群上运行,我只在本地环境中测试过它。我可以想象在现场节点上运行作业时的情况,并且所有缓存数据的节点都被云提供商获取。在这种情况下,Spark 将尝试重新计算丢失的数据并最终失败并出现相同的异常。
    • 哦,我明白了。那么,您是否会说我提出的解决方法仍然更强大?您认为刷新我在问题中发布的metadata 会有所帮助吗?
    • cache 将数据存储在每个执行程序的本地磁盘上,然后当我们写入数据时,它将从磁盘中获取,而不是从远程文件中获取。
    猜你喜欢
    • 1970-01-01
    • 2019-11-20
    • 1970-01-01
    • 2017-01-22
    • 2020-02-03
    • 2019-02-05
    • 2020-08-15
    • 1970-01-01
    • 2021-08-27
    相关资源
    最近更新 更多