【问题标题】:Kafka Streams windowing aggregation batchingKafka Streams 窗口聚合批处理
【发布时间】:2018-09-24 16:24:51
【问题描述】:

我的应用程序中有 Kafka Streams 处理:

myStream
    .mapValues(customTransformer::transform)
    .groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
    .windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
    .aggregate(CustomCollectorObject::new,
            (key, value, aggregate) -> aggregate.collect(value),
            Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
                    .withValueSerde(new CustomCollectorSerde()))
    .toStream()
    .foreach((k, v) -> /* do something very important */);

预期行为:传入消息按键分组,并在某个时间间隔内聚合到CustomCollectorObjectCustomCollectorObject 只是一个内部有 List 的类。在foreach 中每 10 秒后,我正在对我的聚合数据做一些非常重要的事情。非常重要的是我希望每 10 秒调用一次 foreach

实际行为:我可以看到我的foreach 中的处理被称为罕见,大约每 30-35 秒一次,这并不重要。非常重要的是,我一次收到 3-4 条消息。

问题是:我怎样才能达到预期的行为?我需要在运行时及时处理我的数据。

我尝试设置cache.max.bytes.buffering: 0,但在这种情况下,窗口根本不起作用。

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    Kafka Streams 具有不同的执行模型并提供不同的语义,即您的期望与 Kafka Streams 所做的不匹配。已经有多个类似的问题:

    另外请注意,社区目前正在开发一个名为 suppress() 的新运算符,它将能够提供您想要的语义:https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

    目前,您需要添加带有状态存储的 transform(),并使用标点符号来获得所需的语义(c.f. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor

    【讨论】:

    • 感谢您的回复!看起来suppress() 确实是我正在寻找的东西。目前,我已通过以下方式解决了我的情况: 1. set commit.interval.ms 与窗口持续时间具有相同的值。 2. 转换为流后添加过滤器,以检查窗口是否完成。如果是这样 - 做一些重要的事情
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-15
    • 1970-01-01
    • 2018-08-21
    • 1970-01-01
    • 1970-01-01
    • 2022-01-22
    相关资源
    最近更新 更多