【问题标题】:Why do I have to configure a state store with Kafka Streams为什么我必须使用 Kafka Streams 配置状态存储
【发布时间】:2020-04-01 08:08:39
【问题描述】:

目前我有以下设置:

StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("kafka.topics.table"),
    new SomeKeySerde(),
    new SomeValueSerde());

streamsBuilder.addStateStore(storeBuilder);

final KStream<byte[], SomeClass> requestsStream = streamsBuilder
            .stream("myTopic", Consumed.with(Serdes.ByteArray(), theSerde));
    requestsStream
            .filter((key, request) -> Objects.nonNull(request))
            .process(() -> new SomeClassUpdater("kafka.topics.table", maxNumMatches), "kafka.topics.table");

Properties streamsConfiguration = loadConfiguration();
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);

streams.start()

为什么我需要本地状态存储,因为我没有使用它进行任何其他计算,并且数据也存储在 kafka 更改日志中?还有它在什么时候存储在本地存储中,是否存储并提交到更改日志?

我面临的问题是我在本地存储,并且及时我遇到了内存问题,尤其是当它经常重新分区时。因为旧分区仍然闲置并填满内存。 所以我的问题是,为什么我们需要使用 RocksDB 的持久性,因为:

  1. 数据保存在kafka changelog中
  2. 当容器消失时,ramdisk 也消失了。

【问题讨论】:

  • 在 Kafka Streams 中不需要状态存储。您是否正在访问处理器中的状态存储?当您尝试从代码中删除它时会发生什么?
  • @ck1 不,我没有在处理器中使用它,但我仍然想知道如果增加 ram 磁盘,人们如何使用它。那么这一切的收获是什么
  • 如果不使用,为什么要创建状态存储并将其添加到拓扑和处理器中?请注意,在process() 中采用状态存储名称的最后一个参数是可选的,您可以使用单个参数调用该方法。因此,也不需要通过addStateStore() 添加状态存储。

标签: apache-kafka apache-kafka-streams rocksdb


【解决方案1】:

在单个线程上,我们可以有多个等于 no 的任务。主题的分区。每个分区都有自己的状态存储,这些状态存储将数据保存到 Kafka 内部主题的变更日志中。 分区的每个状态存储还维护其他分区的状态存储的副本,为了恢复任务可能失败的分区的数据。

如果您不使用状态存储,并且您的一项任务失败,它将转到内部主题,即 Changelog,然后将为分区获取数据,这对 CPU 来说是一项耗时的工作。因此,维护状态存储减少了任务可能失败的时间,并立即从另一个任务状态存储中获取数据。

【讨论】:

  • 不知道为什么这回答了这个问题?这个答案解释了状态存储是如何工作的,但问题是“为什么我需要使用状态存储”,这个问题的答案是:你不需要使用状态存储......
猜你喜欢
  • 1970-01-01
  • 2019-07-03
  • 1970-01-01
  • 1970-01-01
  • 2018-10-24
  • 2021-01-04
  • 2021-10-11
  • 2019-07-14
  • 1970-01-01
相关资源
最近更新 更多