【问题标题】:Retrieve both payload and headers of a Kafka message that could not be deserialized检索无法反序列化的 Kafka 消息的有效负载和标头
【发布时间】:2018-10-31 02:50:57
【问题描述】:

我正在使用 spring-kafka 2.1.7 来消费 JSON 消息,我想处理无法正确反序列化的消息。
为了覆盖循环相同消息的默认行为,我扩展了 JsonDeserializer 以覆盖反序列化方法。

public class CustomKafkaJsonDeserializer<T> extends JsonDeserializer<T> {

    public CustomKafkaJsonDeserializer(Class<T> targetType) {
        super(targetType);
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return super.deserialize(topic, data);
        } catch (Exception e) {
            log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e.getMessage());
            return null;
        }
    }

}

这是我的消费者及其配置:

@Service
public class Consumer {

    @KafkaListener(topics = "${kafka.topic.out}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "customKafkaListenerErrorHandler")
    public void consume(@Payload Lines lines, @Headers MessageHeaders messageHeaders) {
        //treatment
    }

}

@Configuration
public class ConsumerConfig {

    ...

    @Bean
    public ConsumerFactory<String, Lines> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new CustomKafkaJsonDeserializer<>(Lines.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Lines>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Lines> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("group.id", this.appName);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", CustomKafkaJsonDeserializer.class);
        props.put("security.protocol", this.securityProtocol);
        props.put("sasl.mechanism", this.saslMechanism);
        props.put("sasl.jaas.config", this.saslJaasConfig);
        return props;
    }
}

最后,我实现了自己的错误处理程序,以便将错误数据发送到其他主题。

@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

    @Autowired
    private KafkaErrorService kafkaErrorService;

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception {
        log.error("error handler for message: {} [{}], exception: {}", message.getPayload(), message.getHeaders(), exception.getMessage());
        kafkaErrorService.sendErrorToKafka(message.getPayload().toString(), exception.getMessage());
        throw new RuntimeException(exception);
    }

}

当我使用错误消息时会发生这种情况:

  • CustomKafkaJsonDeserializer 尝试反序列化消息并捕获异常。
  • 可以在 catch 块中检索有效负载,但不能在标头中检索。返回 null 以提前偏移量。
  • 它进入错误处理程序的handleError 方法。 message.getHeaders() 返回正确的标头,但 message.getPayload() 返回一个 KafkaNull 对象。因此,我无法在此步骤同时发送有效负载和标头。

关于如何实现这一点的任何建议?

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    返回一个包含数据和标题的丰富对象,而不是返回 null。

    【讨论】:

    • 在 2.2 中,我们有一个新的 ErrorHandlingDeserializer 但它不处理标题;我们将在 2.2.1 中修复它:github.com/spring-projects/spring-kafka/issues/845 – Gary Russell 11 分钟前
    • 确实,我有一个 KafkaNull,因为返回了 null。我实施了建议的解决方案,但找不到干净的方法。有趣的是,对于将 JsonSerializer 作为值序列化器(而不是 StringSerializer)的生产者,我在错误处理程序中正确检索了有效负载和标头。
    猜你喜欢
    • 1970-01-01
    • 2022-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-29
    • 2021-09-14
    • 2013-12-07
    • 2019-08-15
    相关资源
    最近更新 更多