【问题标题】:Where to add StateStore in SpringBoot Kafka Stream在 Spring Boot Kafka Stream 中添加状态存储的位置
【发布时间】:2020-06-25 19:04:40
【问题描述】:

我正在尝试使用拓扑测试驱动程序,它需要在其构造函数中使用拓扑。 然而,虽然应用程序本身运行良好,但它在我的 KStream 单元测试中失败并出现以下错误:

"StateStore ... is already added"

这是我要测试(缩短)的 KStream:

@Bean
public KStream<...,...> kstream(StreamsBuilder builder) {
  builder.addStateStore(...);
  KStream<...,...> stream = builder.stream().filter(...).etc()
  stream.process(()->...);
  return stream;
}

我的测试(缩短)

def "..."() {
  given:
    ...
    StreamsBuilder builder = new StreamsBuilder();
    MyStreamService myStreamService = new MyStreamService(...stubbed);
    KStream mykStream = myStreamService.kStream(builder);
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), ...)
    ...

一旦我运行 builder.build() 来获取拓扑,它就会引发上述错误 - 但是我不明白为什么,因为我只在那个地方调用了一次 addStateStore。我删除了除 .addStoreStore() 方法之外的整个流逻辑,以查看是否有任何其他方法将初始化它(映射、过滤器、进程等)无济于事。

我知道还有其他方法可以测试 Kafka 流,但我特别试图让它按照上面解释的方式工作。如果这是不可能的,那没关系。

【问题讨论】:

    标签: spring-boot unit-testing apache-kafka topology


    【解决方案1】:

    在 application.yml 文件中尝试添加以下行: spring:kafka:streams:state-dir: "dir-path" kstreams 将为状态存储使用“dir-path”目录。

    【讨论】:

      猜你喜欢
      • 2023-03-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-21
      • 2017-05-22
      • 2021-07-28
      • 1970-01-01
      相关资源
      最近更新 更多