问题在于 Streams 在窗口期间自动包装显式 serde,但不会自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806
正如其他人所指出的,解决方案是在上游显式设置密钥 serde,而不依赖于默认密钥 serde。您可以:
使用Materialized在窗口聚合上设置serdes
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(如尼舒推荐的)
(请注意,不需要命名count操作,这样做的副作用是使其可查询)
或者在上游设置 serdes,例如在输入端:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(正如 wardziniak 推荐的)
选择权在你;我认为在这种情况下,这两种情况都没有太大的不同。如果您进行与count 不同的聚合,那么无论如何您可能会通过Materialized 设置值serde,所以也许前者会是一种更统一的样式。
我还注意到您的窗口定义没有设置宽限期。窗口关闭时间定义为window end + grace period,默认为 24 小时,因此在 24 小时的数据通过应用程序运行之前,您不会看到抑制发出的任何内容。
对于您的测试工作,我建议您尝试:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
在生产中,您需要选择一个宽限期,以平衡您期望在流中的事件延迟量与您希望从抑制中看到的发射及时量。
最后一点,我在您的要点中注意到您没有更改默认缓存或提交间隔。因此,您会注意到 count 运算符本身将在默认的 30 秒内缓冲更新,然后再将它们传递给抑制。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但它可能会在您测试时让您感到惊讶。
通常对于测试(或以交互方式尝试的东西),我将禁用缓存并将提交间隔设置为短以最大限度地提高开发人员的理智:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
对于 serde 的疏忽,我们深表歉意。我希望我们能尽快解决 KAFKA-7806。
我希望这会有所帮助!