【问题标题】:Is it possible to restore a Kafka Streams state store after a restart without using changelog topics?重新启动后是否可以在不使用更改日志主题的情况下恢复 Kafka Streams 状态存储?
【发布时间】:2021-10-16 16:27:59
【问题描述】:

我们有 2 个 compacted 主题,每个主题都包含 TB 级的数据,我们希望使用 Spring Cloud Stream 和 Kafka Streams 来加入这些主题。 (简化的)代码如下所示:

@Bean
public BiConsumer<KTable<String, LeftEvent>, KTable<String, RightEvent>> processEvents() {
    return ((leftEvents, rightEvents) -> {
      leftEvents.join(rightEvents, this::merge)
        .toStream()
        .foreach(this::process);
    });
}

这种方法的问题在于,使用 KTables 作为输入参数会导致创建 changelog 主题,这实际上是 复制 源主题,如上所述,这两个主题都已经压缩了。为了避免在 Kafka 中复制 TB 级数据,我们的第一次尝试是使用 KStreams 作为输入,并将它们转换为 KTables,如下所示:

stream.toTable(
  Materialized
    .<K, V, KeyValueStore<Bytes, byte[]>>as(stateStoreName)
    .withLoggingDisabled()
);

从而禁用日志记录,从而省去变更日志主题,在我们的上下文中这似乎没用。

但是,以下情况现在不再适用:

  1. 使用密钥k1 生成LeftEvent
  2. 重新启动应用程序
  3. 使用密钥k1 生成RightEvent

事件不再加入,但如果应用程序未在其间重新启动(即第 1 步,然后第 3 步),则加入工作正常。

当应用程序重新启动时,我们希望在没有更改日志主题的情况下从源主题重建状态存储,但显然情况并非如此。在某些情况下,我们观察到 RocksDB 文件(位于/tmp/kafka-streams/...)用于检索重启前消耗的数据,但是我们不能假设这些文件在重启后仍然可用,因为我们在容器化环境中工作。

有没有一种方法可以支持重启(并实现容错)而无需使用更改日志主题,在我们的例子中,这些主题与输入主题重复?如果没有,我们可能不得不重新考虑我们对 Kafka Streams 的使用......

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream


    【解决方案1】:

    您想要启用 Kafka Streams 的优化:https://docs.confluent.io/platform/current/streams/developer-guide/optimizing-streams.html#optimization-details(#1 是您要查找的内容)。

    目前,Kafka Streams 在启用时会执行两项优化:

    1. 源 KTable 将源主题重新用作更改日志主题。
    2. 如果可能,Kafka Streams 会将多个重新分区主题合并为一个重新分区主题。

    需要指出的关键一点,因为我自己也犯了这个错误,所以不要忘记将配置发送到 build() 和 KStreams 的构造(优化,如提供的链接中所示)是在构建中完成。

    // tell Kafka Streams to optimize the topology
    config.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
    
    // Since we've configured Streams to use optimizations, the topology is optimized during the build.
    // And because optimizations are enabled, the resulting topology will no longer need to perform
    // three explicit repartitioning steps, but only one.
    final Topology topology = builder.build(config);
    final KafkaStreams streams = new KafkaStreams(topology, config);
    

    现在所有拓扑都启用了优化,因此请记住,还执行了 #2 优化。

    【讨论】:

      猜你喜欢
      • 2021-01-28
      • 1970-01-01
      • 2021-10-11
      • 2020-08-17
      • 2015-01-12
      • 2018-11-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多