【发布时间】:2018-09-24 16:24:51
【问题描述】:
我的应用程序中有 Kafka Streams 处理:
myStream
.mapValues(customTransformer::transform)
.groupByKey(Serialized.with(new Serdes.StringSerde(), new SomeCustomSerde()))
.windowedBy(TimeWindows.of(10000L).advanceBy(10000L))
.aggregate(CustomCollectorObject::new,
(key, value, aggregate) -> aggregate.collect(value),
Materialized.<String, CustomCollectorObject, WindowStore<Bytes, byte[]>>as("some_store_name")
.withValueSerde(new CustomCollectorSerde()))
.toStream()
.foreach((k, v) -> /* do something very important */);
预期行为:传入消息按键分组,并在某个时间间隔内聚合到CustomCollectorObject。 CustomCollectorObject 只是一个内部有 List 的类。在foreach 中每 10 秒后,我正在对我的聚合数据做一些非常重要的事情。非常重要的是我希望每 10 秒调用一次 foreach!
实际行为:我可以看到我的foreach 中的处理被称为罕见,大约每 30-35 秒一次,这并不重要。非常重要的是,我一次收到 3-4 条消息。
问题是:我怎样才能达到预期的行为?我需要在运行时及时处理我的数据。
我尝试设置cache.max.bytes.buffering: 0,但在这种情况下,窗口根本不起作用。
【问题讨论】:
标签: java apache-kafka apache-kafka-streams