这个问题已经很老了,所以我假设您在此期间找到了解决方案。但是,以防万一它对其他人有所帮助,我发现我的 ProducerInterceptor 类,它根据消息的内容将消息分派到不同的主题,除非我的流已经具有指定的输出,否则不会被调用。
我的第一次尝试看起来像这样,因为我认为我不需要指定输出主题。这不起作用:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
但这确实:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
值得注意的是,在第二个示例中,没有任何内容发布到 dummy-output-topic,而且使用 to 而不是 through 似乎也以相同的方式工作。
在我的例子中,我调用map 来更改记录,然后使用拦截器将它们分派到不同的主题,所以我的代码实际上看起来更像这样:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
.map(new CustomKeyValueMapper)
.through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
我希望这些示例可以帮助任何与 ProducerInterceptors 一起工作的人,他们犯了同样的错误。