【问题标题】:KSQL Hopping Window : accessing only oldest subwindowKSQL Hopping Window:仅访问最旧的子窗口
【发布时间】:2019-01-18 13:07:27
【问题描述】:

我正在使用如下所示的查询来跟踪特定字段的滚动总和:

SELECT id, SUM(quantity) AS quantity from stream \
WINDOW HOPPING (SIZE 1 MINUTE, ADVANCE BY 10 SECONDS) \
GROUP BY id;

现在,对于每个输入刻度,它似乎返回了 6 个不同的聚合值,我猜它们是针对以下时间段的:

[start, start+60] seconds
[start+10, start+60] seconds
[start+20, start+60] seconds
[start+30, start+60] seconds
[start+40, start+60] seconds
[start+50, start+60] seconds

如果我有兴趣只为每个进入的滴答获得 [start, start+60] 秒的结果。有什么办法只得到那个吗?

【问题讨论】:

  • 应该是start+70start+80等吗?我不认为所有窗口都有相同的结束时间戳:)
  • 啊,我知道它是多么令人困惑。我的意思更多的是所包含的实际数据的时间范围。所以在前 60 秒结束时,数据被吐出就像:过去 60 秒、50 秒、40 秒、30 秒、20 秒和 10 秒的聚合。对吗?
  • 您的意思是,当您处理的第一条记录具有例如时间戳 1005 时,您会得到窗口 [950,1010), [960, 1020), [970,1030), [980, 1040), [990,1050)[1000,1060)?但是您只想获得窗口[1000,1060) 而没有旧窗口?
  • @Matthias J. Sax。使用流 API,如何仅根据您的示例获取窗口 [1000,1060) 结果的结果。过滤最新的窗口是唯一的选择吗?或者有什么办法可以抑制旧窗口的跳跃?

标签: apache-kafka streaming ksqldb


【解决方案1】:

因为你指定了一个跳跃窗口,每条记录落入多个窗口中,处理一条记录时需要更新所有窗口。只更新一个窗口是不正确的,结果也是错误的。

比较关于跳跃窗口的 Kafka Streams 文档(Kafka Streams 是 KSQL 的内部运行时引擎):https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows

更新

Kafka Streams 正在通过 KIP-450 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL) 添加适当的滑动窗口支持。这也应该允许稍后将滑动窗口添加到 ksqlDB。

【讨论】:

  • 我知道所有的窗口都需要更新才能使窗口正常工作,但我正在寻找一种只从 [start, start+60] 秒获得结果的方法,所以我可以仅将那些更新与窗口化结果转储到单独的主题中。有没有办法做到这一点?
  • 不是 100% 确定你的意思——我想你可以过滤——但是为什么你只想要一个窗口的结果呢?它是流处理而不是批处理,您只能选择特定的时间范围。或者我想念你的问题。
  • 我怎么能得到它,因为窗口信息本身不会以一种可以轻松过滤的方式直接公开,除非我有某种自定义 UDF 我猜它可以挑选出最小窗口的行时间?我需要这个的原因很简单。在任何时候,我只关心最后 60 秒内发生的事情。但是当 60 秒结束时,我不想丢弃所有 60 秒的数据,只丢弃最后 10 秒。因此,一个 60 秒的跳跃窗口提前 10 秒。
  • 我认为我们越来越了解彼此 :) KSQL 的工作方式是,它并行维护所有窗口,即,一个窗口没有单个“记录缓冲区”,并且随着时间的推移,记录会从窗口中删除,并且窗口会“移动”。 KSQL 为所有窗口并行维护一个“缓冲区”(注意,KSQL 不会将原始记录存储在“缓冲区”中,而是维护每个窗口的聚合结果)。因为所有窗口都是同时更新的,所以你会得到所有窗口的更新记录。
  • 请注意,KSQL 不会“关闭”窗口,也不会为每个窗口发出单个结果——相反,对窗口的每次更新都会立即报告。您可能想阅读这篇解释运行时模型的博文:confluent.io/blog/watermarks-tables-event-time-dataflow-model
【解决方案2】:

我遇到了类似的情况,并创建了一个用户定义的函数来仅访问带有 collect_list(column).size() = 窗口持续时间的窗口,这似乎是一个有前途的轨道。

在 udf 中使用 List 类型来获取您的聚合基列值列表之一。则判断形成的列表大小是否等于周期的跳跃窗口数,否则返回null。

从这里创建一个选择数据并使用 udf 转换它的表。

从这个最新的表中创建一个表,并过滤掉转换后的列上的空值。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-09
    相关资源
    最近更新 更多