【问题标题】:Suppress triggers events only when new events are received on the stream仅当在流上接收到新事件时才抑制触发事件
【发布时间】:2019-10-26 14:26:51
【问题描述】:

我正在使用 Kafka 流 2.2.1。

我正在使用抑制来阻止事件,直到窗口关闭。我正在使用事件时间语义。 但是,触发消息仅在流上有新消息可用时才会触发。

提取以下代码对问题进行示例:

        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);

    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];

    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();

消息仅在新消息到达“sideA”流时从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制发出任何消息,即使窗口关闭时间已经过去很久了)。 尽管在生产中由于容量很大,问题可能不会发生太多,但在很多情况下,不必等待进入“sideA”流的新消息。

提前致谢。

【问题讨论】:

  • 这是预期的行为——如果没有数据到达,事件时间不会改变,因此无法关闭窗口。
  • 这里的问题有点不同。新事件仍会到达系统,因此,系统的事件时间确实发生了变化。但是,被抑制的消息在特定流上被抑制。只要消息没有到达这个特定的流,窗口就不会关闭并且被抑制的消息仍然被抑制。向该特定流生成消息以强制关闭窗口是可能的,但是,需要为代码中的每个抑制实现并显着损害代码的可读性
  • 明白了——再说一遍,这是设计使然。更多细节请查看原始设计文档:cwiki.apache.org/confluence/display/KAFKA/…
  • 此处列出了一种可能的解决方案:stackoverflow.com/a/60824254/458370

标签: apache-kafka-streams


【解决方案1】:

根据 Kafka 流文档:

只有在所有输入主题的所有输入分区都有可用的新数据(具有更新的时间戳)时,流时间才会提前。如果至少一个分区没有任何新数据可用,则流时间不会提前,因此如果指定了 PunctuationType.STREAM_TIME,则不会触发 punctuate()。此行为与配置的时间戳提取器无关,即使用 WallclockTimestampExtractor 不会启用 punctuate() 的挂钟触发。

我不确定为什么会出现这种情况,但是,它解释了为什么仅当消息在它使用的队列中可用时才发出被抑制的消息。

如果有人对为什么会这样实施有答案,我将很乐意学习。这种行为导致我的实现发出消息只是为了让我的抑制消息及时发出,并导致代码的可读性大大降低。

【讨论】:

  • 您的报价似乎是针对“标点符号”...以这种方式实现suppress()的原因有很多——总的来说,它很复杂。查看原始设计文档:cwiki.apache.org/confluence/display/KAFKA/…
  • 谢谢。我明白。问题是,使用转换器/处理器实现抑制比使用实际抑制操作实现抑制更容易,因为它需要用“控制”消息馈送每个被抑制的流,然后忽略它们(因为它们不需要逻辑)以确保流时间进展
  • 好吧——如果你手动实现它,你会得到不同的语义——即使没有输入数据,你似乎也希望“流时间”继续进行,考虑到“流时间”。不过,您可能对此 KIP 感兴趣:cwiki.apache.org/confluence/display/KAFKA/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-09-06
  • 1970-01-01
  • 2023-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-27
相关资源
最近更新 更多