【发布时间】:2019-11-17 18:08:24
【问题描述】:
如何将JsonDeserializer 添加到我的 Kafka 消费者。我现在收到一个 Json 字符串,并希望将其转换为数据类对象。
这是我的消费者:
消费者配置:
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("\${kafka.host:localhost}")
private val host: String? = null
@Value("\${kafka.port:9092}")
private val port: Int = 0
@Bean
fun userConsumerConfigs(): ConsumerFactory<String, String> {
val props = HashMap<String, Any>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.GROUP_ID_CONFIG] = "helloworld"
val mapper = ObjectMapper()
return DefaultKafkaConsumerFactory(props, StringDeserializer(), StringDeserializer())
//return props
}
@Bean
fun kafkaListenerContainerFactory():
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
String>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = userConsumerConfigs()
return factory
}
@Bean
fun kafkaConsumer(): Consumerz {
return Consumerz()
}
companion object {
const val TOPIC = "test"
}
}
消费者:
class Consumerz {
@KafkaListener(topics = ["usertest"])
fun receive(message: String) {
LOGGER.info("Received payload= $message")
}
companion object {
private val LOGGER = LoggerFactory.getLogger(Consumerz::class.java)
}
}
我目前只使用StringDeserializer。在这种情况下我该如何实现JsonDeserializer。
当前输出为:Received payload= { "firstName": "Jack", "lastName" : "Adam" }
提前致谢。
@这是一个春季项目。
【问题讨论】:
标签: spring kotlin apache-kafka