【发布时间】:2019-03-19 17:29:58
【问题描述】:
在使用 Spring Cloud Stream 连接到 Kafka 的 Spring Boot 应用程序中,我尝试设置两个单独的流侦听器方法:
- 一个从主题“t1”和“t2”作为 KTable 读取,在一个中使用不同的键重新分区,然后连接到另一个中的数据
- 另一个从不相关的主题“t3”中读取为 KStream。
因为第一个监听器做了一些加入和聚合,一些主题是自动创建的,例如“test-1-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition-0”。 (不确定这是否与问题有关。)
当我通过使用 @StreamListener 注释的两个单独方法来设置代码时,在 Spring Boot 应用程序启动时出现以下错误:
Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
我认为重要的部分是:“为非订阅主题正则表达式模式分配的分区 t1-0;订阅模式是 t3”。这是两个不相关的主题,据我所知,与 t3 相关的任何内容都不应订阅与 t1 相关的任何内容。导致问题的确切主题也会间歇性地变化:有时它是提到的自动生成的主题之一,而不是 t1 本身。
以下是两个流侦听器的设置方式(在 Kotlin 中):
@StreamListener
fun listenerForT1AndT2(
@Input("t1") t1KTable: KTable<String, T1Obj>,
@Input("t2") t2KTable: KTable<String, T2Obj>) {
t2KTable
.groupBy(...)
.aggregate(
{ ... },
{ ... },
{ ... },
Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
.join(t1KTable,
{ ... },
Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(SomeObj::class.java)))
}
@StreamListener
fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
events.map { ... }
}
但是,当我设置我的代码时,我只用@StreamListener 注释的一个方法,并为所有三个主题获取参数,一切正常,例如
@StreamListener
fun compositeListener(
@Input("t1") t1KTable: KTable<String, T1Obj>,
@Input("t2") t2KTable: KTable<String, T2Obj>,
@Input("t3") t3KStream: KStream<String, T3Obj>) {
...
}
但我认为我只能有一个@StreamListener 方法是不对的。
我知道有 content-based routing 用于向 StreamListener 注释添加条件,但是如果方法定义了输入通道,那么我不确定是否需要在这里使用它 - 我本来以为在方法参数上使用@Input 注释就足以告诉系统绑定到哪些通道(以及哪些Kafka 主题)?如果我确实需要使用基于内容的路由,我如何在此处应用它以使每个方法仅接收来自相关主题的项目?
我还尝试将两个侦听器方法分离为两个单独的类,每个类都有 @EnableBinding 仅用于它感兴趣的接口(即一个接口用于 t1 和 t2,一个单独的接口用于 t3),但这无济于事。
我发现的与此错误消息相关的所有其他内容,例如here,是关于拥有多个应用程序实例,但在我的情况下,只有一个 Spring Boot 应用程序实例。
【问题讨论】:
标签: apache-kafka spring-cloud-stream spring-kafka