【发布时间】:2020-07-09 23:48:42
【问题描述】:
我有一个现有的流,它使用两个主题作为其来源:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
stream1
.merge(stream2)
.groupByKey
.reduce(reduceValues)
.toStream
.to("result-topic")
StateStore 的自动生成名称是 KSTREAM-REDUCE-STATE-STORE-0000000003。
现在我需要再添加一个主题作为来源。但是,添加新源会增加 a kafka-internal number,导致 StateStore 变为 KSTREAM-REDUCE-STATE-STORE-0000000005。我不想丢失当前状态,所以我明确提供了旧的StateStore 的名称:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic
stream1
.merge(stream2)
.merge(stream3) // merge new topic
.groupByKey
.reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
.toStream
.to("result-topic")
它似乎有效,但我不确定我是否在干扰 Kafka 内部,因为:
- 我正在使用 Kafka 自动生成的自定义名称(可能存在名称冲突?)
- 用于提供此
StateStore的流集与最初不同。
有没有cmets?
【问题讨论】:
标签: scala apache-kafka apache-kafka-streams