【问题标题】:How to add JsonDeserializer to Kafka consumer in Kotlin?如何在 Kotlin 中将 JsonDeserializer 添加到 Kafka 消费者?
【发布时间】:2019-11-17 18:08:24
【问题描述】:

如何将JsonDeserializer 添加到我的 Kafka 消费者。我现在收到一个 Json 字符串,并希望将其转换为数据类对象。

这是我的消费者:

消费者配置:

@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("\${kafka.host:localhost}")
private val host: String? = null

@Value("\${kafka.port:9092}")
private val port: Int = 0

@Bean
fun userConsumerConfigs(): ConsumerFactory<String, String> {
    val props = HashMap<String, Any>()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.GROUP_ID_CONFIG] = "helloworld"
    val mapper = ObjectMapper()
    return DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())

    //return props
}


@Bean
fun kafkaListenerContainerFactory(): 
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, 
String>> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = userConsumerConfigs()
    return factory
}

@Bean
fun kafkaConsumer(): Consumerz {
    return Consumerz()
}

companion object {
    const val TOPIC = "test"
}
}

消费者:

class Consumerz {
@KafkaListener(topics = ["usertest"])
fun receive(message: String) {
     LOGGER.info("Received payload= $message")

}

companion object {
    private val LOGGER = LoggerFactory.getLogger(Consumerz::class.java)
}
}

我目前只使用StringDeserializer。在这种情况下我该如何实现JsonDeserializer

当前输出为:Received payload= { "firstName": "Jack", "lastName" : "Adam" }

提前致谢。

@这是一个春季项目。

【问题讨论】:

    标签: spring kotlin apache-kafka


    【解决方案1】:

    你可以直接使用org.springframework.kafka.support.serializer.JsonDeserializer.class如下

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    

    因此,消费者配置将是:

    private ConsumerFactory<String, MyDomainModel> myMessageFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KAPP");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyDomainModel.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(myMessageFactory());
        return factory;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-18
      • 2017-09-07
      • 2019-11-14
      • 1970-01-01
      • 1970-01-01
      • 2022-08-08
      相关资源
      最近更新 更多