【问题标题】:Kafka Streams - Using An Existing State Store After Adding a New Source StreamKafka Streams - 添加新源流后使用现有状态存储
【发布时间】:2020-07-09 23:48:42
【问题描述】:

我有一个现有的流,它使用两个主题作为其来源:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")

stream1
  .merge(stream2)
  .groupByKey
  .reduce(reduceValues)
  .toStream
  .to("result-topic")

StateStore 的自动生成名称是 KSTREAM-REDUCE-STATE-STORE-0000000003

现在我需要再添加一个主题作为来源。但是,添加新源会增加 a kafka-internal number,导致 StateStore 变为 KSTREAM-REDUCE-STATE-STORE-0000000005。我不想丢失当前状态,所以我明确提供了旧的StateStore 的名称:

val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic

stream1
  .merge(stream2)
  .merge(stream3) // merge new topic
  .groupByKey
  .reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
  .toStream
  .to("result-topic")

它似乎有效,但我不确定我是否在干扰 Kafka 内部,因为:

  1. 我正在使用 Kafka 自动生成的自定义名称(可能存在名称冲突?)
  2. 用于提供此StateStore 的流集与最初不同。

有没有cmets?

【问题讨论】:

    标签: scala apache-kafka apache-kafka-streams


    【解决方案1】:

    老实说,最安全的选择是将人类可读的名称添加到此状态,但正如您所提到的,您将失去它。

    我认为你所做的应该没有任何问题(至少在你引入另一个代码更改之前:))。 ID 0000000003 将分配给 groupByKey 操作员,因此不会有任何冲突(尽管我不是 100% 确定那里的 Kafka Streams 内部结构)。

    还有Application Reset Tool 允许您重新生成聚合。但我不知道它是否适用于您的情况:您对输入主题的保留政策可能会阻止此工具重新生成精确的聚合。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-07-03
      • 1970-01-01
      • 2018-10-24
      • 2021-01-04
      • 2021-10-11
      • 2019-07-14
      • 1970-01-01
      • 2020-01-20
      相关资源
      最近更新 更多