【问题标题】:How can I control the number of output files written from Spark DataFrame?如何控制从 Spark DataFrame 写入的输出文件的数量?
【发布时间】:2018-11-15 00:14:19
【问题描述】:

使用 Spark 流从 Kafka 主题中读取 Json 数据。
我使用 DataFrame 处理数据,稍后我希望将输出保存到 HDFS 文件。问题是使用:

df.write.save("append").format("text")

产生许多文件,有些很大,有些甚至是 0 字节。

有没有办法控制输出文件的数量?另外,为了避免“相反”的问题,有没有办法限制每个文件的大小,以便在当前达到一定大小/行数时写入一个新文件?

【问题讨论】:

  • 第一部分有coalesce/repartition,第二部分没有什么干净和简单的。您可能应该为此使用 bash 命令 split
  • @philantrovert 这不是真的,因为 spark 2.2 你可以使用maxRecordsPerFile,例如df.write.option("maxRecordsPerFile", 10000)..,参见例如gatorsmile.io/…
  • @RaphaelRoth 非常感谢!我根本不知道这件事。这非常有用。
  • @RaphaelRoth 这正是我想要的!谢谢。

标签: scala apache-spark apache-kafka apache-spark-sql spark-streaming


【解决方案1】:

您可以使用尺寸估算器:

import org.apache.spark.util.SizeEstimator
val size  = SizeEstimator.estimate(df)

接下来,您可以根据数据框的大小调整文件数量,并使用 repatition 或 coalesce

【讨论】:

  • 感谢您的帮助。 SizeEstimator 引用行?
  • 对象大小,以字节为单位。
【解决方案2】:

输出文件的数量等于Dataset的分区数量这意味着您可以根据上下文通过多种方式控制它:

  • 对于没有广泛依赖关系的 Datasets,您可以使用阅读器特定参数控制输入
  • 对于具有广泛依赖关系的Datasets,您可以使用spark.sql.shuffle.partitions 参数控制分区数。
  • 您可以coalescerepartition 独立于血统。

有没有办法限制每个文件的大小,以便在当前文件达到一定大小/行数时写入新文件?

没有。对于内置的 writer,严格来说是 1:1 的关系。

【讨论】:

  • 从 spark 2.2 开始,您可以使用 maxRecordsPerFile,例如df.write.option("maxRecordsPerFile", 10000),见gatorsmile.io/…
猜你喜欢
  • 2015-09-23
  • 1970-01-01
  • 2018-10-18
  • 1970-01-01
  • 1970-01-01
  • 2018-12-08
  • 1970-01-01
  • 2023-03-16
  • 1970-01-01
相关资源
最近更新 更多