【发布时间】:2019-05-13 14:00:44
【问题描述】:
我正在尝试通过在附加输出模式下工作来获得流聚合/组,以便能够在流到流连接中使用生成的流。我正在开发 (Py)Spark 2.3.2,我正在使用 Kafka 主题。
我的伪代码如下所示,在 Zeppelin 笔记本中运行
orderStream = spark.readStream().format("kafka").option("startingOffsets", "earliest").....
orderGroupDF = (orderStream
.withWatermark("LAST_MOD", "20 seconds")
.groupBy("ID", window("LAST_MOD", "10 seconds", "5 seconds"))
.agg(
collect_list(struct("attra", "attrb2",...)).alias("orders"),
count("ID").alias("number_of_orders"),
sum("PLACED").alias("number_of_placed_orders"),
min("LAST_MOD").alias("first_order_tsd")
)
)
debug = (orderGroupDF.writeStream
.outputMode("append")
.format("memory").queryName("debug").start()
)
在那之后,我预计数据会出现在debug 查询中,我可以从中进行选择(在 20 秒的延迟到达窗口到期之后。但调试查询中没有每个数据都出现(我等了几分钟) )
当我将输出模式更改为update 时,查询立即生效。
任何提示我做错了什么?
编辑:经过更多实验,我可以添加以下内容(但我仍然不明白)。
在启动 Spark 应用程序时,我使用的主题有很多旧数据(带有事件时间戳
在 eventTimestamp 非常接近当前时间的输入主题上生成新消息后,查询立即输出所有“排队”记录,并在查询中碰撞 eventTime 水印。
我还可以看到,时区似乎存在问题。我的 Spark 程序在 CET 中运行(当前为 UTC+2)。传入 Kafka 消息中的时间戳采用 UTC 格式,例如 "LAST__MOD": "2019-05-14 12:39:39.955595000"。我设置了spark_sess.conf.set("spark.sql.session.timeZone", "UTC")。尽管如此,在输入主题上生成“新”消息后的微批处理报告显示
"eventTime" : {
"avg" : "2019-05-14T10:39:39.955Z",
"max" : "2019-05-14T10:39:39.955Z",
"min" : "2019-05-14T10:39:39.955Z",
"watermark" : "2019-05-14T10:35:25.255Z"
},
所以 eventTime 以某种方式与输入消息中的时间相关联,但它是 2 小时。 UTC 差异已被减去两次。此外,我看不到水印计算是如何工作的。鉴于我将其设置为 20 秒,我预计它会比最大事件时间早 20 秒。但显然它比它大 4 分 14 秒。我看不出这背后的逻辑。
我很困惑……
【问题讨论】:
-
我怀疑水印没有严格设置为
MAX(eventtime)-threshold。相反,似乎对于水印,它只考虑事件时间>查询开始时间的事件。至少在我看来是这样的
标签: apache-spark-sql spark-structured-streaming