【发布时间】:2017-07-10 16:09:03
【问题描述】:
我使用结构化流从 kafka 加载消息,进行一些聚合然后写入 parquet 文件。问题是为来自 kafka 的 100 条消息创建了太多 parquet 文件(800 个文件)。
聚合部分是:
return model
.withColumn("timeStamp", col("timeStamp").cast("timestamp"))
.withWatermark("timeStamp", "30 seconds")
.groupBy(window(col("timeStamp"), "5 minutes"))
.agg(
count("*").alias("total"));
查询:
StreamingQuery query = result //.orderBy("window")
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("checkpointLocation", "c:\\bigdata\\checkpoints")
.start("c:\\bigdata\\parquet");
当使用 spark 加载一个 parquet 文件时,它显示为空
+------+-----+
|window|total|
+------+-----+
+------+-----+
如何将数据集保存到一个 parquet 文件中? 谢谢
【问题讨论】:
-
您可以使用
result.repartition(1)。但这可能会导致OOM Exception。你可以给repartition()一些好的数字,以避免OOM Exception。 -
它仍然会创建空的镶木地板文件。似乎每次查询处理时,它都会将结果写入单独的拼花文件。如何指定文件名并限制查询只在该文件上写入和更新?
-
你做了什么来解决这个问题?
-
我遇到了同样的问题,你是怎么解决的?谢谢!
-
没有人解决这个问题吗?我迷路了。
标签: apache-spark spark-streaming parquet