【发布时间】:2021-10-16 16:27:59
【问题描述】:
我们有 2 个 compacted 主题,每个主题都包含 TB 级的数据,我们希望使用 Spring Cloud Stream 和 Kafka Streams 来加入这些主题。 (简化的)代码如下所示:
@Bean
public BiConsumer<KTable<String, LeftEvent>, KTable<String, RightEvent>> processEvents() {
return ((leftEvents, rightEvents) -> {
leftEvents.join(rightEvents, this::merge)
.toStream()
.foreach(this::process);
});
}
这种方法的问题在于,使用 KTables 作为输入参数会导致创建 changelog 主题,这实际上是 复制 源主题,如上所述,这两个主题都已经压缩了。为了避免在 Kafka 中复制 TB 级数据,我们的第一次尝试是使用 KStreams 作为输入,并将它们转换为 KTables,如下所示:
stream.toTable(
Materialized
.<K, V, KeyValueStore<Bytes, byte[]>>as(stateStoreName)
.withLoggingDisabled()
);
从而禁用日志记录,从而省去变更日志主题,在我们的上下文中这似乎没用。
但是,以下情况现在不再适用:
- 使用密钥
k1生成LeftEvent - 重新启动应用程序
- 使用密钥
k1生成RightEvent
事件不再加入,但如果应用程序未在其间重新启动(即第 1 步,然后第 3 步),则加入工作正常。
当应用程序重新启动时,我们希望在没有更改日志主题的情况下从源主题重建状态存储,但显然情况并非如此。在某些情况下,我们观察到 RocksDB 文件(位于/tmp/kafka-streams/...)用于检索重启前消耗的数据,但是我们不能假设这些文件在重启后仍然可用,因为我们在容器化环境中工作。
有没有一种方法可以支持重启(并实现容错)而无需使用更改日志主题,在我们的例子中,这些主题与输入主题重复?如果没有,我们可能不得不重新考虑我们对 Kafka Streams 的使用......
【问题讨论】:
标签: java apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream