【发布时间】:2018-06-08 09:17:24
【问题描述】:
我有一个需要监听多个不同主题的应用程序;每个主题对于如何处理消息都有单独的逻辑。我曾想过为每个 KafkaStreams 实例使用相同的 kafka 属性,但我收到如下错误。
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
代码 (kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
我发现这个reference 表明您不能将application.id 用于多个主题,但是我发现很难找到支持它的参考文档。 documentation for application.id 声明:
流处理应用程序的标识符。在 Kafka 集群中必须是唯一的。它用作 1) 默认客户端 ID 前缀,2) 用于成员管理的组 ID,3) 变更日志主题前缀。
问题
- 此错误是什么意思,是什么原因造成的。
- 鉴于您可以让应用程序的多个实例以相同的 id 运行以从多个主题分区消费,“在 Kafka 集群中必须是唯一的”是什么意思?
- 您能否使用相同的 Kafka 流
application.id来启动两个列出不同主题的KafkaStreams?如果有,怎么做?
详情: kafka 0.11.0.2
【问题讨论】:
标签: apache-kafka apache-kafka-streams