【问题标题】:How to merge part files in HDFS?如何在 HDFS 中合并部分文件?
【发布时间】:2021-12-16 10:14:51
【问题描述】:

我想要什么

我在此类目录中有 17TB 的日期分区数据:

/data_folder
  /date=2021.01.01
    /part-00002-f0b91523-6e0c-4adc-88cc-e9451614791d.c000.snappy.parquet
    /part-00002-f0193442-c20e-49d2-bde1-70053ae2a254.c000.snappy.parquet
    /... over 9000 part files 
  /date=2021.01.02
    /part-00002-bdb50c33-fd32-4e87-9edb-cec77973760b.c000.snappy.parquet
    /part-00001-e2cd906e-5669-46d7-92e9-7498ed60487f.c000.snappy.parquet
    /... over 9000 part files 

我想让它看起来像这样:

/data_folder
  /date=2021.01.01
    /merge.parquet
  /date=2021.01.02
    /merge.parquet

我之所以想要这个,是因为我听说 HDFS 更适合存储少量大文件,而不是大量小文件。现在我的查询变得非常缓慢。希望这个优化能加快他们的速度

我做什么

所以我运行这样的命令:

hdfs dfs -getmerge /data_folder/date=2021.01.01 merge.parquet;
hdfs dfs -copyFromLocal -f -t 4 merge.parquet /merged/date=2021.01.01/merge.parquet;

我得到了我想要的目录结构,但现在我无法读取文件。查询:

%spark2.spark

val date = "2021.01.01"


val ofdCheques2Uniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
    .withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
    .dropDuplicates("chequeId")
    
val ofdChequesTempUniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
    .withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
    .dropDuplicates("chequeId")

println(s"OfdCheques2   : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")

打印:

OfdCheques2   : 4309 unique cheques
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 1720, srs-st-hdp-s3.dev.kontur.ru, executor 1): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13

同时,这样的查询:

val ofdCheques2Uniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
    
val ofdChequesTempUniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")

println(s"OfdCheques2   : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")

打印:

OfdCheques2   : 5290 unique cheques
OfdChequesTemp: 18 unique cheques

最后的问题

  1. getmerge 命令是否适用于我的问题?如果是这样,我做错了什么?
  2. 解决此问题的最佳方法是什么?

【问题讨论】:

  • 我假设我忘记复制描述数据格式的文件。问题是我不知道它们存储在哪里,甚至不知道它们是否存在
  • 您不需要合并它们,因为几乎所有 Hadoop FS API 都可以读取整个目录路径
  • 而且,不,getmerge 没有解决问题,因为您实际上是将 TB 的数据复制到一台机器上,然后重新上传它(当您的客户端机器只有很少的GB 的磁盘空间)
  • > 您不需要合并它们,因为几乎所有 Hadoop FS API 都可以读取整个目录路径@OneCricketeer,但大量文件可能会使 NameNode 过载。我想消除这种过载
  • 如果传入时间序列数据,实际的 TSDB 比 Hadoop 更有意义。或德鲁伊/黑皮诺。此外,您可能希望跟踪摄取时间,而不仅仅是记录日期本身。

标签: apache-spark hadoop hdfs parquet


【解决方案1】:

得到我想要的目录结构,但现在我无法读取文件

这是由于 Parquet 文件的二进制结构。它们具有页眉/页脚元数据,用于存储模式和文件中的记录数...getmerge 因此实际上只对行分隔的非二进制数据格式有用。

您可以做的是拥有spark.read.path("/data_folder"),然后是repartitioncoalesce 该数据框,然后输出到新的“合并”输出位置

另一种选择是 Gobbilin - https://gobblin.apache.org/docs/user-guide/Compaction/

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-11-08
    • 1970-01-01
    • 2013-12-04
    • 2017-07-20
    • 2021-03-10
    • 2021-01-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多