【问题标题】:configuring ReplyingKafkaTemplate for multiple entities为多个实体配置 ReplyingKafkaTemplate
【发布时间】:2020-09-12 20:27:13
【问题描述】:

我有 2 项服务(在 kotlin 上都有 spring boot)。将其命名为“客户端”和“服务器” 由于某些限制,我必须在 kafka 中使用同步请求-回复模式。所以我尝试使用ReplyingKafkaTemplate。我的问题是我需要用于多个实体。意味着创建多个ReplyingKafkaTemplate,一个用于“FOO”,第二个用于“BAR”。因此,在我的代码中,我创建了多个 KafkaConfig 类,并设置了每个实体和一个基本配置。

我还从加载中排除了 KafkaAutoConfig。下面是“服务器”端的配置(没有 kreplying kafka 模板):

@Configuration
@EnableKafka
class KafkaConfig @Autowired constructor(
    @Value("\${kafka.bootstrap-servers}")
    private var bootstrapServers: String,
    @Value("\${kafka.consumer-group.name}")
    private var consumerGroup: String,
    @Value("\${kafka.consumer-group.id}")
    private var groupId: Number
) {

    @Bean("kafkaProducerConfig")
    fun producerConfigs(): MutableMap<String, Any> {
        return mutableMapOf(
            Pair(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"),
            Pair(ProducerConfig.ACKS_CONFIG, "all"),
            Pair(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"),
            Pair(ProducerConfig.RETRIES_CONFIG, Int.MAX_VALUE.toString()),
            Pair(ProducerConfig.LINGER_MS_CONFIG, "20"),
            Pair(ProducerConfig.BATCH_SIZE_CONFIG, (32 * 1024).toString()),
            Pair(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
        )
    }

    @Bean("kafkaConsumerConfig")
    fun consumerConfigs(): Map<String, Any> {
        return mutableMapOf(
            Pair(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9093,kafka3:9094"),
            Pair(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer::class.java),
            Pair(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer::class.java),
            Pair(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
            Pair(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup + groupId)
        )
    }

    @Bean("kafkaAdminConfig")
    fun admin(): KafkaAdmin {
        val configs: MutableMap<String, Any> = HashMap()
        configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
        return KafkaAdmin(configs)
    }
}

一个用于 FOO(BAR 与 foo 相同的配置,而不是另一个实体和 bean 的命名)配置是:

@Configuration
class KafkaConfigForFOO {

    @Value("\${kafka.topic.request-consumable-topic}")
    private lateinit var requestConsumableTopic: String

    @Value("\${kafka.request-reply.timeout-ms}")
    private lateinit var replyTimeout: Number

    @Bean("requestFOOTopicConfig")
    fun requestConsumableTopic(): NewTopic {
        val configs: MutableMap<String, String> = HashMap()
        configs["retention.ms"] = replyTimeout.toString()
        return NewTopic(requestConsumableTopic, 6, 3.toShort()).configs(configs)
    }

    @Bean("producerFactoryForFOO")
    @Autowired
    fun producerFactoryForFOO(@Qualifier("kafkaProducerConfig") producerConfigs: MutableMap<String, Any>):
        ProducerFactory<String, FOO> = DefaultKafkaProducerFactory(producerConfigs)

    @Bean("kafkaTemplateForFOO")
    @Autowired
    fun kafkaTemplateForFOO(@Qualifier("producerFactoryForFOO") producerFactory: ProducerFactory<String, FOO>):
        KafkaTemplate<String, FOO> = KafkaTemplate(producerFactory)

    @Bean("consumerFactoryForFOO")
    @Autowired
    fun consumerFactoryForFOO(@Qualifier("kafkaConsumerConfig") consumerConfigs: MutableMap<String, Any>):
        ConsumerFactory<String, FOO> = DefaultKafkaConsumerFactory(consumerConfigs, StringDeserializer(), JsonDeserializer(FOO::class.java))

    @Bean("kafkaListenerContainerFactoryForFOO")
    @Autowired
    fun kafkaListenerContainerFactoryForFOO(
        @Qualifier("consumerFactoryForFOO") consumerFactory: ConsumerFactory<String, FOO>,
        @Qualifier("kafkaTemplateForFOO") kafkaTemplate: KafkaTemplate<String, FOO>
    ):
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FOO>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, FOO>()
        factory.consumerFactory = consumerFactory
        factory.setReplyTemplate(kafkaTemplate)
        return factory
    }
}

还有带有 kafka 监听器的服务类:

@Component
class FOOReplyingKafkaConsumer @Autowired constructor(
    private val fooService: FooService
) {
    @KafkaListener(topics = ["\${kafka.topic.request-FOO-topic}"], containerFactory = "kafkaListenerContainerFactoryForFoo", groupId = "\${spring.kafka.consumer.group-id}")
    @SendTo()
    fun cropListen(request: FOO): FOO{
        return FOO(fooService.getAllByIds(request.ids ?: mutableSetOf()).toMutableSet())
    }
}

问题是如果我从共享的消费者/生产者配置中删除 bean 命名,spring 会在 kafkaTEmpalte 和 producerFactory 之间创建循环依赖关系。当我将其退回时,由于无法解析引导服务器,spring 看不到 kafka 道具并抛出Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

此外,如果我删除排除 KafkaAutoConfiguration,ContainerListener 无法知道女巫配置注入的异常抛出

使用带有一个模板的简单 java spring kafka 应用程序一切都很好。

【问题讨论】:

    标签: java spring-boot kotlin apache-kafka spring-kafka


    【解决方案1】:

    您不需要两个消费者工厂;类型擦除意味着它在运行时无关紧要。

    boot配置一为

    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    

    实际上是 &lt;Object, Object&gt;(或 Kotlin 中的 &lt;Any, Any&gt;)。

    两个监听器可以使用同一个工厂。

    由于您使用的是 JSON 反序列化器;类型由发送端设置的标头确定。

    与模板相同。

    public KafkaTemplate<?, ?> kafkaTemplate
    

    可以使用不同的泛型类型或&lt;Object, Object&gt; 多次注入。

    【讨论】:

      【解决方案2】:

      感谢 Gary Russell 的正确指导。我删除了除生产者配置和 kafkaTemplate 之外的所有配置。使用所需类型自动装配到 kafkaTemplate 默认的 Spring Boot 配置中。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-08-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-08-23
        • 1970-01-01
        • 2012-03-24
        相关资源
        最近更新 更多