【发布时间】:2020-06-25 19:04:40
【问题描述】:
我正在尝试使用拓扑测试驱动程序,它需要在其构造函数中使用拓扑。 然而,虽然应用程序本身运行良好,但它在我的 KStream 单元测试中失败并出现以下错误:
"StateStore ... is already added"
这是我要测试(缩短)的 KStream:
@Bean
public KStream<...,...> kstream(StreamsBuilder builder) {
builder.addStateStore(...);
KStream<...,...> stream = builder.stream().filter(...).etc()
stream.process(()->...);
return stream;
}
我的测试(缩短)
def "..."() {
given:
...
StreamsBuilder builder = new StreamsBuilder();
MyStreamService myStreamService = new MyStreamService(...stubbed);
KStream mykStream = myStreamService.kStream(builder);
TopologyTestDriver driver = new TopologyTestDriver(builder.build(), ...)
...
一旦我运行 builder.build() 来获取拓扑,它就会引发上述错误 - 但是我不明白为什么,因为我只在那个地方调用了一次 addStateStore。我删除了除 .addStoreStore() 方法之外的整个流逻辑,以查看是否有任何其他方法将初始化它(映射、过滤器、进程等)无济于事。
我知道还有其他方法可以测试 Kafka 流,但我特别试图让它按照上面解释的方式工作。如果这是不可能的,那没关系。
【问题讨论】:
标签: spring-boot unit-testing apache-kafka topology