【问题标题】:pyspark write to hive table in reduced/compressed number of small filespyspark 以减少/压缩的小文件数量写入配置单元表
【发布时间】:2020-10-10 01:22:11
【问题描述】:

每次进程运行时我都会更新一个数据帧记录,这意味着每次进程完成时我都会有一个一行 4 列的数据帧。 然后我将使用 dataframe write 和 parquet 格式将它插入到 hive 表中。 由于一次只有一条记录,我在 hfds 的 table 文件夹中看到了很多小文件。

当我将数据写入 hive 表时,您能否告诉我如何减少并将其写入同一个文件(parquet 文件)??

hdfs location: user_id/employe_db/market_table/
from:
part-04498-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04497-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04496-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04450-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet
part-04449-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

to:
part-03049-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

如何将镶木地板文件的数量减少到固定数量的更少文件并将新数据加载/写入现有文件? part-04499-f33fc4b5-47d9-4d14-b37e-8f670cb2c53c-c000.snappy.parquet

【问题讨论】:

    标签: dataframe hadoop pyspark hive hdfs


    【解决方案1】:

    在写信给 HDFS 之前,您可以 repartition(1) 以便每次执行创建 1 个文件。

    df.repartition(1).write.parquet("<directory>")
    

    Merging files:

    Using Hive:

    如果您已经在 user_id/employe_db/market_table/ 目录顶部有 hive 表,则通过选择同一个表运行插入覆盖。

    spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market>")
    

    --只创建一个文件然后使用order by

    spark.sql("insert overwrite table <db>.<tab_market> select * from <db>.<tab_market> order by <column>")
    

    您也可以像在 Hive 中一样运行插入语句。

    (或)

    Using Spark:

    作为后期摄取过程,您可以再次从目录中读取 parquet 文件,然后再次重新分区并写入目录。

    df_src=spark.read.parquet("<directory>")
    df_src.repartition(<number>).write.mode("overwrite").parquet("<directory>")
    

    NOTE

    • 覆盖首先删除目录,以防万一在这之间作业失败,我们可能会遇到数据丢失。
    • 最佳做法是将数据备份到 tmp 目录然后仅覆盖

    【讨论】:

    • 要使用这个答案,首先按照注释中的建议备份数据,否则这将导致FileNotFoundException看到这个答案-stackoverflow.com/a/61052681/4758823
    • 其实我没有得到。我尝试这段代码的原因是,在我们的 HDFS 命名空间中,特定目录中的文件数量不超过 10K 文件数量,所以在写入该表路径时我怎么能确保有更少的镶木地板文件???
    • 谢谢@Shu。但是, df.repartition(1).write.parquet("") - 正确它会在重新分区后创建一个文件。但是,那些已经在那个 hdfs 文件夹中的文件呢?多个小文件越来越多,这就是我面临的问题。那你有什么办法解决吗?
    • @SureshGudimetla,在答案中,我在 Merging files: 部分的表格中添加了合并文件的方法。通过使用这些方法,您可以在 HDFS 中合并小文件并创建更大的文件。
    • @Shu 谢谢,它成功了。但是,当我插入新数据时 - 所以 1. 我将数据附加到现有表中。 2. 根据您的观点,我正在读取数据并使用 repartiition(1) 覆盖到同一个表可以看到相同数量的记录。
    猜你喜欢
    • 1970-01-01
    • 2013-09-11
    • 2020-03-27
    • 1970-01-01
    • 2013-07-25
    • 1970-01-01
    • 2020-04-23
    • 1970-01-01
    • 2011-08-18
    相关资源
    最近更新 更多