【发布时间】:2020-09-12 20:27:13
【问题描述】:
我有 2 项服务(在 kotlin 上都有 spring boot)。将其命名为“客户端”和“服务器” 由于某些限制,我必须在 kafka 中使用同步请求-回复模式。所以我尝试使用ReplyingKafkaTemplate。我的问题是我需要用于多个实体。意味着创建多个ReplyingKafkaTemplate,一个用于“FOO”,第二个用于“BAR”。因此,在我的代码中,我创建了多个 KafkaConfig 类,并设置了每个实体和一个基本配置。
我还从加载中排除了 KafkaAutoConfig。下面是“服务器”端的配置(没有 kreplying kafka 模板):
@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
@Value("\${kafka.bootstrap-servers}")
private var bootstrapServers: String,
@Value("\${kafka.consumer-group.name}")
private var consumerGroup: String,
@Value("\${kafka.consumer-group.id}")
private var groupId: Number
) {
@Bean("kafkaProducerConfig")
fun producerConfigs(): MutableMap<String, Any> {
return mutableMapOf(
Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
Pair(ProducerConfig.ACKS_CONFIG, "all"),
Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
)
}
@Bean("kafkaConsumerConfig")
fun consumerConfigs(): Map<String, Any> {
return mutableMapOf(
Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
)
}
@Bean("kafkaAdminConfig")
fun admin(): KafkaAdmin {
val configs: MutableMap<String, Any> = HashMap()
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
return KafkaAdmin(configs)
}
}
一个用于 FOO(BAR 与 foo 相同的配置,而不是另一个实体和 bean 的命名)配置是:
@Configuration
class KafkaConfigForFOO {
@Value("\${kafka.topic.request-consumable-topic}")
private lateinit var requestConsumableTopic: String
@Value("\${kafka.request-reply.timeout-ms}")
private lateinit var replyTimeout: Number
@Bean("requestFOOTopicConfig")
fun requestConsumableTopic(): NewTopic {
val configs: MutableMap<String, String> = HashMap()
configs["retention.ms"] = replyTimeout.toString()
return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
}
@Bean("producerFactoryForFOO")
@Autowired
fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)
@Bean("kafkaTemplateForFOO")
@Autowired
fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)
@Bean("consumerFactoryForFOO")
@Autowired
fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))
@Bean("kafkaListenerContainerFactoryForFOO")
@Autowired
fun kafkaListenerContainerFactoryForFOO(
@Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
@Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
):
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
factory.consumerFactory = consumerFactory
factory.setReplyTemplate(kafkaTemplate)
return factory
}
}
还有带有 kafka 监听器的服务类:
@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
private val fooService: FooService
) {
@KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
@SendTo()
fun cropListen(request: FOO): FOO{
return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
}
}
问题是如果我从共享的消费者/生产者配置中删除 bean 命名,spring 会在 kafkaTEmpalte 和 producerFactory 之间创建循环依赖关系。当我将其退回时,由于无法解析引导服务器,spring 看不到 kafka 道具并抛出Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers。
此外,如果我删除排除 KafkaAutoConfiguration,ContainerListener 无法知道女巫配置注入的异常抛出
使用带有一个模板的简单 java spring kafka 应用程序一切都很好。
【问题讨论】:
标签: java spring-boot kotlin apache-kafka spring-kafka