【发布时间】:2020-03-02 17:41:31
【问题描述】:
我正在开发具有以下拓扑的 Kafka Streams 应用程序:
private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();
final StreamsBuilder streamBuilder = new StreamsBuilder();
final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
Consumed.with(Serdes.String(), **valueSerde**));
final KStream<String, Value> filteredStreams = eventStreams
.filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));
final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
(key, transactionEntry) -> transactionEntry.getNodeid(),
Serialized.with(Serdes.Integer(), **valueSerde**));
/* Hopping window */
final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
.grace(Duration.ofSeconds(0)));
/* Aggregating the events */
final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
.aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("suppress-window")
.toStream();
suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value));
return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());
我观察到在窗口期间/之后间歇性地很少有事件被丢弃。 例如:
- 可以在 isAllowedByRules() 方法中查看/打印所有记录,这些记录有效(过滤器允许)并由流使用。
- 但是当打印 countAggregator 中的事件时,我可以看到很少有事件没有通过它。
流的当前配置:
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);
最初,我使用的是翻转窗口,但我发现大多数情况下在窗口结束时很少有事件丢失,所以我改为使用跳跃窗口(复制比丢失更好)。然后丢弃的事件变为零。但是今天,在将近 4 天之后,我再次看到了一些被丢弃的事件,其中有一个模式,即与一起制作的其他事件相比,它们晚了近一分钟。但随后的期望是,这些迟到的事件应该在未来的任何窗口中出现,但这并没有发生。如果我的理解不正确,请在这里纠正我。
正如我在主题中提到的,在重新启动流时(优雅地)我可以看到在聚合步骤中很少有事件再次丢失,尽管是由 isAllowedByRules() 方法处理的。
我在堆栈溢出和其他网站上进行了很多搜索,但找不到这种行为的根本原因。是否与我缺少/未正确设置的某些配置有关,或者可能是由于其他原因?
【问题讨论】:
-
任何可能的 transactionEntry.getNodeid() 可以返回 null (从而解释一些下降)?
-
But then expectation is that these late events should come in any of the future windows but that didn't happen-- 正如@Yannick 下面的回答中提到的那样,默认情况下不会发生这种情况,因为默认情况下会使用事件时间语义。也许将timestamp.extractor更改为WallclockTimestampExtractor可以解决您的问题。 -
嗨@MatthiasJ.Sax 流感谢您的回复。它工作了很长时间,但我又可以看到消息被丢弃了。根据您的建议,我已将
timestamp.extractor更改为WallclockTimestampExtractor。我还添加了宽限期。当前配置是: Kafka 窗口时间:30s 窗口提前时间:25s 宽限期:60s 提交间隔:100s 是否可以获得流时间,以便我可以将消息时间戳与流时间进行比较,并检查它是否真的很晚!这也可能是由于乱序事件而发生的。有什么方法可以找到它们吗? -
ATM,流时间不可访问。我们有一个 KIP,希望在 AK 2.8 版本中添加此功能:cwiki.apache.org/confluence/display/KAFKA/…
-
好的,谢谢@MatthiasJ.Sax 提供的信息。
标签: apache-kafka apache-kafka-streams windowing