【问题标题】:spring Kafka integration test listener not working (KAFKA JsonDeserializer)春季卡夫卡集成测试监听器不工作(KAFKA JsonDeserializer)
【发布时间】:2022-01-16 00:36:39
【问题描述】:

KafkaConsumerConfig.java

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);



    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);

    return props;
}

public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
    ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new KafkaErrorHandler());
    factory.setMessageConverter(new StringJsonMessageConverter());
    factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());

    return factory;
}

MetadataFileCustom.Java

@KafkaListener(topics = TOPIC,
        groupId = GROUP,
        containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
    log.info(TOPIC+ "===> RECEIVED MESSAGE:" + metadataFileIntegrationDTO);
    metadataFileService.save(metadataFileIntegrationDTO);
}

如果我将 consumerFactoryMetadataFileIntegration 更改为

public ConsumerFactory consumerFactoryMetadataFileIntegration() {
    return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
            new StringDeserializer());
}

有效,但声纳抱怨..

错误: 侦听器失败;嵌套异常是 java.lang.IllegalStateException: Only String, Bytes, or byte[] supported

【问题讨论】:

  • 你已经有一个用 json 反序列化器定义的工厂 bean。为什么要使用字符串?

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

您可以想象使用 MessageConverter 的消费者流程如下:

  1. 本机反序列化器(在您的情况下为StringDeserializer)将byte[] 消息反序列化为String 消息。

  2. Consumer.poll() 返回这些 String 消息。

  3. 您的 MessageConverter (StringJsonMessageConverter) 将这些 String 消息转换为您的类型 MetadataFileIntegrationDTO(由 @KafkaListener 中的参数确定)


因此,当您将本机反序列化器定义为JsonDeserializer(对应于ConsumerFactory&lt;String, MetadataFileIntegrationDTO&gt;)时,consumer.poll() 返回了MetadataFileIntegrationDTO 消息,而这不是StringJsonMessageConverter 可以处理的类型(您可以请参阅仅支持字符串、字节或字节[]

当您将JsonDeserializer 更改为StringDeserializer 时,对应的ConsumerFactoryConsumerFactory&lt;String, String&gt;。这意味着当你从这个ConsumerFactory 创建一个新的Consumer 时,consumer.poll() 返回String

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-06-18
    • 1970-01-01
    • 2018-03-13
    • 2021-11-30
    • 2018-01-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多