【问题标题】:Spark Structured Streaming writing to parquet creates so many filesSpark Structured Streaming 写入 parquet 会创建如此多的文件
【发布时间】:2017-07-10 16:09:03
【问题描述】:

我使用结构化流从 kafka 加载消息,进行一些聚合然后写入 parquet 文件。问题是为来自 kafka 的 100 条消息创建了太多 parquet 文件(800 个文件)。

聚合部分是:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));

查询:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");

当使用 spark 加载一个 parquet 文件时,它显示为空

+------+-----+
|window|total|
+------+-----+
+------+-----+

如何将数据集保存到一个 parquet 文件中? 谢谢

【问题讨论】:

  • 您可以使用result.repartition(1)。但这可能会导致OOM Exception。你可以给repartition()一些好的数字,以避免OOM Exception。
  • 它仍然会创建空的镶木地板文件。似乎每次查询处理时,它都会将结果写入单独的拼花文件。如何指定文件名并限制查询只在该文件上写入和更新?
  • 你做了什么来解决这个问题?
  • 我遇到了同样的问题,你是怎么解决的?谢谢!
  • 没有人解决这个问题吗?我迷路了。

标签: apache-spark spark-streaming parquet


【解决方案1】:

我的想法是使用 Spark Structured Streaming 来使用来自 Azure Even Hub 的事件,然后以 parquet 格式将它们存储在存储中。

我终于想出了如何处理创建的许多小文件。 Spark 版本 2.4.0。

这就是我的查询的样子

dfInput
  .repartition(1, col('column_name'))
  .select("*")
  .writeStream
  .format("parquet")
  .option("path", "adl://storage_name.azuredatalakestore.net/streaming")
  .option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
  .trigger(processingTime='480 seconds')
  .start()

因此,我每 480 秒在存储位置创建一个文件。 要弄清楚文件大小和文件数之间的平衡以避免OOM错误,只需使用两个参数:分区数和processingTime,即批处理间隔。

希望您可以根据自己的用例调整解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-07-22
    • 1970-01-01
    • 1970-01-01
    • 2022-01-14
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    • 2020-07-01
    相关资源
    最近更新 更多