【发布时间】:2018-03-30 10:59:47
【问题描述】:
嗨,我有我的 spark 数据框的输出,它创建文件夹结构并创建部分文件。 现在我必须合并文件夹内的所有部分文件,并将该文件重命名为文件夹路径名。
这就是我做分区的方式
df.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")/
.option("codec", "gzip")
.save("hdfs:///user/zeppelin/FinancialLineItem/output")
它创建这样的文件夹结构
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00002-87a61115-92c9-4926-a803-b46315e55a08.c001.csv.gz
我必须像这样创建最终文件
hdfs:///user/zeppelin/FinancialLineItem/output/Japan.1971.currenttime.csv.gz
这里没有零件文件bith 001和002是合二为一的。
我的数据非常大,300 GB gzip 和 35 GB 压缩所以coalesce(1) and repartition 变得非常慢。
我在这里看到了一种解决方案 Write single CSV file using spark-csv 但我无法实现它,请帮助我。
重新分区抛出错误
error: value repartition is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
dfMainOutputFinalWithoutNull.write.repartition("DataPartition","StatementTypeCode")
【问题讨论】:
-
我想您合并文件的动机是在 Spark 的外部处理它。在这种情况下,我会说该方法是将它们 在 Spark 之外 合并,因为您放弃了数据的分布式特性,而这本质上是使用 Spark 处理数据的原因。跨度>
-
为什么要合并所有文件?被分割成多个部分的文件非常适合使用 Spark 阅读。此外,HDFS 并不意味着像这样保存单个大文件,所以如果你打算这样做,它应该保存到集群的头节点。这是一个替代 HDFS 的选项吗?
-
@Anupam 好的 - 为什么要将它们合并到一个文件中?
-
@DanCiborowski-MSFT 我必须将这些文件交付给客户,他们希望它们采用相同的格式..我们至少可以控制每个分区的文件数,例如每个分区 5 个文件吗?目前它为甚至有 1 GB 文件的分区创建 200 多个 ..
-
目前它为甚至 1 GB 文件的分区创建了 200 多个,这可能是因为您可能正在运行分组(洗牌)类型的转换。当然你可以限制
rdd/df.repartition(x)。 x 是您要为该 rdd/df 创建的文件数
标签: scala apache-spark hdfs spark-dataframe hadoop2