【发布时间】:2021-09-07 20:25:43
【问题描述】:
我正在使用 Apache Spark 编写一个连续的应用程序。在结构化流式传输案例中,我尝试从 Delta 表中读取数据,通过时间窗口对事件时间执行流式聚合,然后以追加模式将结果写入 Delta 表。我对文档的期望是,在附加模式下,只有一个时间窗口的最终聚合将被写入接收器。这不是我的经验。相反,我在目标 Delta 表中看到如下记录,与我尝试使用流的许多配置无关(windowDuration=5 分钟,slideDuration=20 秒)。
从上图中可以看出,同一个时间窗口正在向 sink 贡献许多记录。我确认每个微批次最多输出一个时间窗口的单个记录,但是一个时间窗口可以贡献来自许多(数量上并不明显一致)微批次的输出记录。这里是流聚合代码的核心。
output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
.withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
windowDuration=f"{analysis_window_length_secs} seconds",
slideDuration=f"{analysis_window_hop_size_secs} seconds"))
.groupBy('time_window')
.applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))
Pandas UDF 创建一些保存标量值的变量,构造一个形状为 [1,N] 的 Pandas DataFrame,并将其作为结果输出。也就是说,它返回一行。我唯一分组的是时间窗口。我怎么能在同一个时间窗口获得多条记录?我以多种方式创建和关闭了流,每次都收到相同的结果(例如,根据Delta Lake docs、per the structured streaming guide,以及跨 read/load/table/toTable API 选项,尝试我能找到的每个选项配置...是的,数小时的蛮力)。我还尝试了水印持续时间和触发周期的各种值范围;没有任何影响。
这是追加模式下的预期行为(即同一时间窗口的多个记录)吗?
编辑:我使用的是 Databricks 运行时版本 8.3 ML。它具有 Spark 版本“3.1.1”。
编辑 2:我暂时考虑这个问题是否相关:https://issues.apache.org/jira/browse/SPARK-25756
【问题讨论】:
标签: pyspark spark-structured-streaming