【发布时间】:2019-10-26 14:26:51
【问题描述】:
我正在使用 Kafka 流 2.2.1。
我正在使用抑制来阻止事件,直到窗口关闭。我正在使用事件时间语义。 但是,触发消息仅在流上有新消息可用时才会触发。
提取以下代码对问题进行示例:
KStream<UUID, String>[] branches = is
.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
(key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
(key, value) -> true);
KStream<UUID, String> sideA = branches[0];
KStream<UUID, String> sideB = branches[1];
KStream<Windowed<UUID>, String> sideASuppressed =
sideA.groupByKey(
Grouped.with(new MyUUIDSerde(),
Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
.reduce((v1, v2) -> {
return v1;
})
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
消息仅在新消息到达“sideA”流时从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制发出任何消息,即使窗口关闭时间已经过去很久了)。 尽管在生产中由于容量很大,问题可能不会发生太多,但在很多情况下,不必等待进入“sideA”流的新消息。
提前致谢。
【问题讨论】:
-
这是预期的行为——如果没有数据到达,事件时间不会改变,因此无法关闭窗口。
-
这里的问题有点不同。新事件仍会到达系统,因此,系统的事件时间确实发生了变化。但是,被抑制的消息在特定流上被抑制。只要消息没有到达这个特定的流,窗口就不会关闭并且被抑制的消息仍然被抑制。向该特定流生成消息以强制关闭窗口是可能的,但是,需要为代码中的每个抑制实现并显着损害代码的可读性
-
明白了——再说一遍,这是设计使然。更多细节请查看原始设计文档:cwiki.apache.org/confluence/display/KAFKA/…
-
此处列出了一种可能的解决方案:stackoverflow.com/a/60824254/458370