【发布时间】:2019-11-20 05:31:46
【问题描述】:
Kafka Streams DSL 窗口聚合导致多个窗口。
@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {
JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);
DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}
DataAggregator.java
public class DataAggregator {
private List<String> dataList = new ArrayList<>();
public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}
public List<String> getDataList() {
return dataList;
}
}
我根据键对输入数据进行分组,然后执行 1 分钟窗口和 30 秒跳跃,在聚合器中我只是收集数据并显示。
我希望在开始时有 1 个窗口,30 秒后会出现另一个窗口。但是实际的输出是不同的,因为开始它本身就创建了 2 个窗口。
预期:
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6] // till 30 seconds only one window
[6] // new window after 30 seconds
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[6, 7, 8]
实际输出:
[1]
[1]
[1, 2]
[1, 2]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6] // duplicate window even before 30 seconds
[6] // new window after 30 seconds and 1 window from earlier will be dropped
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
因为我在 1 分钟的窗口中创建了 30 秒的希望窗口。我相信,最初应该只有一个窗口,30 秒后应该创建另一个窗口。
有人可以告诉我,实际输出是预期行为还是我遗漏了什么?
注意:我每 4 秒获取一次输入数据,预期/实际输出仅用于表示。
来自 Kafka 文档:
跳跃时间窗口与纪元对齐,间隔较小 bound 是包含的,而上限是排除的。 “与 纪元”表示第一个窗口从时间戳零开始。为了 例如,跳跃窗口大小为 5000ms,提前间隔 3000 毫秒的(“跳跃”)具有可预测的窗口边界 [0;5000),[3000;8000),... - 而不是 [1000;6000),[4000;9000),... 甚至 一些“随机”的东西,例如 [1452;6452),[4452;9452),....
【问题讨论】:
-
我相信这是来自 Kafka Streams 的默认行为。它使用挂钟时间戳提取器,并且根据我的理解,窗口是根据挂钟时间生成的。因此,在您的情况下,当您启动应用程序时会创建两个窗口 - 例如 -
"start":[2019,11,20,17,39,30],"end":[2019,11,20,17,40,30]和"start":[2019,11,20,17,40],"end":[2019,11,20,17,41]。因此,两个窗口都会拿起钥匙。但是来自 Kafka Streams 核心团队的人可能想要确认这一点。 -
@sobychacko 感谢您的回复。有没有办法避免这种情况?一开始我不需要2个窗口。是否可以使用消息时间戳而不是挂钟时间戳进行窗口化?
-
@PratapA.K,sobychacko 的帖子听起来很合理。跳跃窗口与纪元对齐(参见kafka.apache.org/23/documentation/streams/developer-guide/…)。如果您需要不同的行为,可以使用
transform()实现自定义窗口。 -
@BrunoCadonna 我不期待不同的行为。根据时间戳为零的 Kafka 文档,应该创建 1 个窗口,但在我的情况下,时间戳为零有 2 个窗口
-
@PratapA.K 您确定从您的第一条记录中提取的时间戳为零,即 1970 年 1 月 1 日吗?
标签: java kafka-consumer-api apache-kafka-streams spring-kafka spring-cloud-stream