【问题标题】:Kafka streams dropping messages during windowing and on restart卡夫卡流在窗口和重新启动期间丢弃消息
【发布时间】:2020-03-02 17:41:31
【问题描述】:

我正在开发具有以下拓扑的 Kafka Streams 应用程序:

private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();

final StreamsBuilder streamBuilder = new StreamsBuilder();

    final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
            Consumed.with(Serdes.String(), **valueSerde**));

    final  KStream<String, Value> filteredStreams = eventStreams
                .filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));

    final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
        (key, transactionEntry) -> transactionEntry.getNodeid(),
        Serialized.with(Serdes.Integer(), **valueSerde**));

    /* Hopping window */
    final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
        .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
            .grace(Duration.ofSeconds(0)));

    /* Aggregating the events */
    final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
        .aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
                .withName("suppress-window")
            .toStream();

    suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value)); 

    return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());

我观察到在窗口期间/之后间歇性地很少有事件被丢弃。 例如:

  • 可以在 isAllowedByRules() 方法中查看/打印所有记录,这些记录有效(过滤器允许)并由流使用。
  • 但是当打印 countAggregator 中的事件时,我可以看到很少有事件没有通过它。

流的当前配置:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>); 
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);

streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);

最初,我使用的是翻转窗口,但我发现大多数情况下在窗口结束时很少有事件丢失,所以我改为使用跳跃窗口(复制比丢失更好)。然后丢弃的事件变为零。但是今天,在将近 4 天之后,我再次看到了一些被丢弃的事件,其中有一个模式,即与一起制作的其他事件相比,它们晚了近一分钟。但随后的期望是,这些迟到的事件应该在未来的任何窗口中出现,但这并没有发生。如果我的理解不正确,请在这里纠正我。

正如我在主题中提到的,在重新启动流时(优雅地)我可以看到在聚合步骤中很少有事件再次丢失,尽管是由 isAllowedByRules() 方法处理的。

我在堆栈溢出和其他网站上进行了很多搜索,但找不到这种行为的根本原因。是否与我缺少/未正确设置的某些配置有关,或者可能是由于其他原因?

【问题讨论】:

  • 任何可能的 transactionEntry.getNodeid() 可以返回 null (从而解释一些下降)?
  • But then expectation is that these late events should come in any of the future windows but that didn't happen -- 正如@Yannick 下面的回答中提到的那样,默认情况下不会发生这种情况,因为默认情况下会使用事件时间语义。也许将 timestamp.extractor 更改为 WallclockTimestampExtractor 可以解决您的问题。
  • 嗨@MatthiasJ.Sax 流感谢您的回复。它工作了很长时间,但我又可以看到消息被丢弃了。根据您的建议,我已将 timestamp.extractor 更改为 WallclockTimestampExtractor。我还添加了宽限期。当前配置是: Kafka 窗口时间:30s 窗口提前时间:25s 宽限期:60s 提交间隔:100s 是否可以获得流时间,以便我可以将消息时间戳与流时间进行比较,并检查它是否真的很晚!这也可能是由于乱序事件而发生的。有什么方法可以找到它们吗?
  • ATM,流时间不可访问。我们有一个 KIP,希望在 AK 2.8 版本中添加此功能:cwiki.apache.org/confluence/display/KAFKA/…
  • 好的,谢谢@MatthiasJ.Sax 提供的信息。

标签: apache-kafka apache-kafka-streams windowing


【解决方案1】:

据我了解,您的宽限期为空:

 /* Hopping window */
...
            .grace(Duration.ofSeconds(0))

因此,您的窗口已关闭,不允许任何迟到。

然后关于您的子问题: But then expectation is that these late events should come in any of the future windows but that didn't happen. Correct me here if my understanding is not right.

也许您正在混合事件时间和处理时间。 如果记录的时间戳(由生产者在生产时添加,或者如果生产者未设置,则在到达集群时由代理添加)在您当前的窗口之外,您的记录将被归类为“迟到”。

这是一个带有 2 条记录“*”的示例。

他们的事件时间(et1 和 et2)适合窗口:

 |    window       |
 t1                t2
 |      *    *     |
       et1  et2          

但是,et2 (pt2) 的处理时间实际上如下:

 |    window       |
 t1                t2
 |      *          |   *
       pt1            pt2

这里的窗口是 t1 和 t2 之间的时间片(处理时间) et1 和 et2 分别是 2 条记录“*”的事件时间。 et1 和 et2 是记录本身中设置的时间戳。 在这个例子中,et1和et2在t1和t2之间,et2是在窗口关闭后收到的,因为你的宽限期是0,它会被跳过。

可能是一个解释

【讨论】:

  • 只是一个小评论:如果记录在窗口结束时间之后,则不需要 late -- 只有在宽限期——否则只是无序
猜你喜欢
  • 2017-06-12
  • 2021-01-26
  • 1970-01-01
  • 1970-01-01
  • 2018-06-01
  • 2016-03-06
  • 1970-01-01
  • 2023-04-06
  • 2019-06-12
相关资源
最近更新 更多