【问题标题】:kafka streams hopping windowed aggregation causing multiple windows at timestamp zerokafka 流跳跃窗口聚合导致时间戳为零的多个窗口
【发布时间】:2019-11-20 05:31:46
【问题描述】:

Kafka Streams DSL 窗口聚合导致多个窗口。

@StreamListener("input")
    public void process(KStream<String, Data> DataKStream) {

        JsonSerde<DataAggregator> DataJsonSerde =
                new JsonSerde<>(DataAggregator.class);

        DataKStream
                .groupByKey()
                .windowedBy(TimeWindows.of(60000).advanceBy(30000))
                .aggregate(
                        DataAggregator::new,
                        (key, Data, aggregator) -> aggregator.add(Data),
                        Materialized.with(Serdes.String(), DataJsonSerde)
                );
    }

DataAggregator.java

public class DataAggregator {

    private List<String> dataList = new ArrayList<>();

    public DataAggregator add(Data data) {
        dataList.add(data.getId());
        System.out.println(dataList);
        return this;
    }

    public List<String> getDataList() {
        return dataList;
    }
}

我根据键对输入数据进行分组,然后执行 1 分钟窗口和 30 秒跳跃,在聚合器中我只是收集数据并显示。

我希望在开始时有 1 个窗口,30 秒后会出现另一个窗口。但是实际的输出是不同的,因为开始它本身就创建了 2 个窗口。

预期:

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6] // till 30 seconds only one window
[6] // new window after 30 seconds
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[6, 7, 8]

实际输出:

[1]
[1]
[1, 2]
[1, 2]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6] // duplicate window even before 30 seconds
[6] // new window after 30 seconds and 1 window from earlier will be dropped
[1, 2, 3, 4, 5, 6, 7]
[6, 7]

因为我在 1 分钟的窗口中创建了 30 秒的希望窗口。我相信,最初应该只有一个窗口,30 秒后应该创建另一个窗口。

有人可以告诉我,实际输出是预期行为还是我遗漏了什么?

注意:我每 4 秒获取一次输入数据,预期/实际输出仅用于表示。

来自 Kafka 文档:

跳跃时间窗口与纪元对齐,间隔较小 bound 是包含的,而上限是排除的。 “与 纪元”表示第一个窗口从时间戳零开始。为了 例如,跳跃窗口大小为 5000ms,提前间隔 3000 毫秒的(“跳跃”)具有可预测的窗口边界 [0;5000),[3000;8000),... - 而不是 [1000;6000),[4000;9000),... 甚至 一些“随机”的东西,例如 [1452;6452),[4452;9452),....

【问题讨论】:

  • 我相信这是来自 Kafka Streams 的默认行为。它使用挂钟时间戳提取器,并且根据我的理解,窗口是根据挂钟时间生成的。因此,在您的情况下,当您启动应用程序时会创建两个窗口 - 例如 - "start":[2019,11,20,17,39,30],"end":[2019,11,20,17,40,30]"start":[2019,11,20,17,40],"end":[2019,11,20,17,41]。因此,两个窗口都会拿起钥匙。但是来自 Kafka Streams 核心团队的人可能想要确认这一点。
  • @sobychacko 感谢您的回复。有没有办法避免这种情况?一开始我不需要2个窗口。是否可以使用消息时间戳而不是挂钟时间戳进行窗口化?
  • @PratapA.K,sobychacko 的帖子听起来很合理。跳跃窗口与纪元对齐(参见kafka.apache.org/23/documentation/streams/developer-guide/…)。如果您需要不同的行为,可以使用 transform() 实现自定义窗口。
  • @BrunoCadonna 我不期待不同的行为。根据时间戳为零的 Kafka 文档,应该创建 1 个窗口,但在我的情况下,时间戳为零有 2 个窗口
  • @PratapA.K 您确定从您的第一条记录中提取的时间戳为零,即 1970 年 1 月 1 日吗?

标签: java kafka-consumer-api apache-kafka-streams spring-kafka spring-cloud-stream


【解决方案1】:

由于您的窗口重叠,每个时间戳都会有多个窗口。对于您的特定窗口配置,您始终会获得 2 个窗口(以毫秒为单位):

[0,60000)   [60000,12000) [12000,18000) ...
     [30000,90000) [90000,15000) ...

您无法更改此行为,但是,您可以在结果上应用filter()(即aggregate(...).filter(...) 以删除您不感兴趣的窗口。

此外,默认情况下,Kafka Streams 使用记录事件时间。有一个WallclockTimestampExtractor,但它仅在您明确设置时使用。参照。 https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-timestamp-extractor

【讨论】:

  • 感谢您的回复。所以,总结一下,如果我有 5 分钟的窗口和 1 分钟的跳跃,那么一开始就会有 5 个窗口。这是正确的吗?
  • 嗯,取决于你所说的“一开始”是什么意思。如果“开始”是 0 比没有,因为它是极端情况。如果“开始”是 2019 年的某个时间戳,那是的。
猜你喜欢
  • 1970-01-01
  • 2022-12-08
  • 1970-01-01
  • 2019-10-15
  • 1970-01-01
  • 1970-01-01
  • 2016-10-08
  • 1970-01-01
  • 2020-10-03
相关资源
最近更新 更多