【问题标题】:Error when creating a topic with partition choosed on Spring Kafka在 Spring Kafka 上创建带有选择的分区的主题时出错
【发布时间】:2020-02-02 01:16:00
【问题描述】:

我正在通过 Kotlin 中的 Spring Kafka 学习使用 Kafka。我了解到,当发布新主题时,如果不存在,则会创建它。因此,当我向从 Spring 创建的新/旧主题发送值时,默认分区为 0,但我想在另一个分区上写入消息,例如分区 1。

当我创建/写一个主题时,它是有效的:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

但是,当我选择分区和密钥时:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

我收到以下错误:

org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).

我尝试将密钥更改为0.1,但也不起作用。显然,当我从 Spring 客户端创建一个主题时,只创建了一个分区,即0

Kafka 生产者配置

@Configuration
class KafkaProducerConfig {

    @Bean
    fun producerFactory() : ProducerFactory<String, String> {
        val configProps = HashMap<String,Any>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate() : KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

}

那么,我如何从 Spring Kafka 客户端创建分区?

【问题讨论】:

    标签: spring kotlin apache-kafka spring-kafka kafka-producer-api


    【解决方案1】:

    您可以使用以下代码管理主题创建机制:

    @Configuration
    public class KafkaTopicConfig {
        @Value(value = "${kafka.bootstrapAddress}")
        private String bootstrapAddress;
        private String testTopicName = "topico-teste-kotlin";
    
        @Bean
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,       bootstrapAddress);
        return new KafkaAdmin(configs);
        }
       @Bean
        public NewTopic testTopic() {
            // second parameter is a number of partitions
            return new NewTopic(testTopicName, 2, (short) 1);
        }
    
    }
    

    【讨论】:

    • 谢谢,它的作品。我刚刚在我的配置生成器上添加了最后一个函数。你可以告诉我如何检查@KafkaListener fun listenWithHeaders(@Payload message : String, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition : Int)函数上阅读的主题?
    • 抱歉,我不完全明白您想在这里做什么?如果您只想读取特定分区,您可以指定 @KafkaListener(topicPartitions="") 如果您需要知道当前 ConsumerRecord 的分区,您可以使用 @KafkaListener(topics = "myTopic") public void listen(ConsumerRecord, ? > cr) 抛出异常 {cr.partition();}
    猜你喜欢
    • 2018-08-05
    • 1970-01-01
    • 2019-02-25
    • 1970-01-01
    • 2018-10-20
    • 1970-01-01
    • 2017-07-21
    • 2017-07-31
    相关资源
    最近更新 更多