【发布时间】:2022-01-16 00:36:39
【问题描述】:
KafkaConsumerConfig.java
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return props;
}
public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new KafkaErrorHandler());
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());
return factory;
}
MetadataFileCustom.Java
@KafkaListener(topics = TOPIC,
groupId = GROUP,
containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
log.info(TOPIC+ "===> RECEIVED MESSAGE:" + metadataFileIntegrationDTO);
metadataFileService.save(metadataFileIntegrationDTO);
}
如果我将 consumerFactoryMetadataFileIntegration 更改为
public ConsumerFactory consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
错误: 侦听器失败;嵌套异常是 java.lang.IllegalStateException: Only String, Bytes, or byte[] supported
【问题讨论】:
-
你已经有一个用 json 反序列化器定义的工厂 bean。为什么要使用字符串?
标签: java spring-boot apache-kafka spring-kafka