【发布时间】:2021-06-26 05:33:33
【问题描述】:
所以我正在读取一个满是 csv 文件的目录,这些文件使用 Pyspark readStream() 和 maxFilesPerTrigger=1 按日期排序。在 Windows 上,它从 最早的 2010-12-01.csv 文件开始,并按顺序向前处理。我将行附加到控制台并指定水印。 2010-12-01.csv => 2010-12-02.csv => 2010-12-03.csv ...
(Windows)
streaming = spark.readStream.format("csv").schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.load("D:\\data\\*.csv")
(Linux)
streaming = spark.readStream.format("csv").schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.load("file:///opt/data/*.csv")
(Same on both)
stream = streaming.selectExpr("car", "cost", "timestamp")\
.withWatermark("timestamp", "30 seconds")\
.groupBy(F.col("car"), F.window("timestamp", "1 hour").alias("tmst_window"))\
.agg(F.sum("cost").alias("agg_cost"))
stream.writeStream.format("console")\
.queryName("customer_purchases")\
.option('truncate', False)\
.outputMode("append").start()
现在,当我在 Linux 上运行完全相同的代码时,它会从一组 csv 文件中随机读取,而不是像 Windows 那样从一开始就开始。我在 Linux 上遇到的一个问题是,它会在前几分钟内读取最后一个文件(2011-12-10.csv,+1 年差异),然后停止处理其余文件。它认为它已经完成,可能是因为其余文件“落后于”它处理的带有水印的文件日期并停止。在 Windows 上,我得到数千行,在 Linux 上,我只得到 41 行。在 Windows 和 Linux 上连续运行时行为保持不变。有人知道为什么会存在这种行为吗?
两者都是独立实例:
Linux:PySpark 版本 2.4.0.16(datastax docker 映像)
Windows:spark-3.1.1-bin-hadoop3.2
【问题讨论】:
-
显示如何写入数据?使用什么输出模式?
-
@AlexOtt 我已经用输出更新了我的问题,将模式附加到控制台
标签: apache-spark pyspark spark-streaming datastax