【问题标题】:can you overlap partitions when writing parquet files写拼花文件时可以重叠分区吗
【发布时间】:2020-11-01 14:14:20
【问题描述】:

我有一个非常大的数据框,大小约为 2TB。 我可以通过两列对它们进行分区:MODULEDATE 如果我将它们按MODULE 划分,每个模块可以有相同的日期,例如MODULE A 可能有日期2020-07-01 , 2020-07-02MODULE B 可能有2020-07-01 , 2020-07-05 等。 我需要先将它们按MODULE 进行分区,然后进行一些聚合和连接,然后才能最终按DATE 对它们进行分区和存储。我正在使用 pyspark 进行编码。

通过 MODULE 进行聚合和连接后,我将其附加到 parquet 文件并将整个 parquet 文件加载到数据框,然后按 DATE 对其进行分区。 问题是火花作业由于内存问题而终止。 MODULE分区可以直接按日期分区吗? 所以分区看起来像这样: 输入格式:s3://path/MODULE=A --> s3://path/DATE=2020-07-01 其中两个模块 AB 都存在于分区 DATE=2020-07-01 中?

这是我的原始代码,由于在集群中的时间很长并且内存不足而失败:

inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)

#Load Data by Module
for uvapath in uvaDirs:
    customPath='s3://' + uvapath + '/'
    df1=spark.read.parquet(customPath)
    #Perform aggregations and joins
    df1.write.mode('append').parquet(outpath)
    
# Load - partition by date
df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")

它成功创建了file_tmp.parquet,但在按日期加载和分区时失败。 任何帮助将不胜感激! 谢谢

【问题讨论】:

    标签: apache-spark amazon-s3 pyspark parquet hadoop-partitioning


    【解决方案1】:

    就像 delta 数据源可以做到这一点,delta store as parquet

    (spark.read
     .format("delta")
     .load(path)
     .where(partition)
     .repartition(numFilesPerPartition)
     .write
     .option("dataChange", "false")
     .format("delta")
     .mode("overwrite")
     .option("replaceWhere", partition)
     .save(path))
    
    // clean old file
    val deltaTable = DeltaTable.forPath(spark, tablePath)
    deltaTable.vacuum(0)
    

    参考:https://docs.delta.io/latest/best-practices.html#-delta-compact-files&language-python

    【讨论】:

    • 谢谢。这是将随机分区为特定大小的块的好方法。但是我保存为delta格式后,是否需要再次加载整个parquet按日期进行分区并保存?
    猜你喜欢
    • 2019-10-28
    • 1970-01-01
    • 2023-03-18
    • 2022-07-29
    • 2016-02-12
    • 2022-07-31
    • 1970-01-01
    • 1970-01-01
    • 2023-02-10
    相关资源
    最近更新 更多