【问题标题】:Better way of error handling in Kafka ConsumerKafka Consumer 中更好的错误处理方式
【发布时间】:2020-11-23 22:16:12
【问题描述】:

我有一个配置了 spring-kafka 的 Springboot 应用程序,我想在其中处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则将重试 2 次,然后应将消息记录到错误文件中。我有两种可以遵循的方法:-

第一种方法(将 SeekToCurrentErrorHandler 与 DeadLetterPublishingRecoverer 一起使用):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

方法 2(使用自定义 SeekToCurrentErrorHandler):-

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

任何人都可以就实现此类功能的标准方法提供建议。在第一种方法中,我们确实看到了创建 .DLT 主题和额外的 @KafkaListener 的开销。在第二种方法中,我们可以直接记录我们的消费者记录异常。

【问题讨论】:

    标签: java spring-boot kafka-consumer-api spring-kafka


    【解决方案1】:

    使用第一种方法,不需要使用DeadLetterPublishingRecoverer,您可以使用任何您想要的ConsumerRecordRecoverer;事实上,默认恢复器只是记录失败的消息。

    /**
     * Construct an instance with the default recoverer which simply logs the record after
     * the backOff returns STOP for a topic/partition/offset.
     * @param backOff the {@link BackOff}.
     * @since 2.3
     */
    public SeekToCurrentErrorHandler(BackOff backOff) {
        this(null, backOff);
    }
    

    而且,在FailedRecordTracker...

    if (recoverer == null) {
        this.recoverer = (rec, thr) -> {
            
            ...
    
            logger.error(thr, "Backoff "
                + (failedRecord == null
                    ? "none"
                    : failedRecord.getBackOffExecution())
                + " exhausted for " + ListenerUtils.recordToString(rec));
        };
    }
    

    在侦听器适配器中添加重试后,向错误处理程序添加了回退(以及重试限制),因此它是“更新的”(并且是首选的)。

    此外,如果使用长 BackOffs,则使用内存中重试可能会导致重新平衡问题。

    最后,只有SeekToCurrentErrorHandler 可以处理反序列化问题(通过ErrorHandlingDeserializer)。

    编辑

    ErrorHandlingDeserializerSeekToCurrentErrorHandler 一起使用。反序列化异常被认为是致命的,并立即调用恢复器。

    the documentation

    这是一个简单的 Spring Boot 应用程序来演示它:

    public class So63236346Application {
    
    
        private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
    
    
        public static void main(String[] args) {
            SpringApplication.run(So63236346Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
        }
    
        @Bean
        ErrorHandler errorHandler() {
            return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                    + ex.getMessage()));
        }
    
        @KafkaListener(id = "so63236346", topics = "so63236346")
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so63236346", "{\"field\":\"value1\"}");
                template.send("so63236346", "junk");
                template.send("so63236346", "{\"field\":\"value2\"}");
            };
        }
    
    }
    
    package com.example.demo;
    
    public class Thing {
    
        private String field;
    
        public Thing() {
        }
    
        public Thing(String field) {
            this.field = field;
        }
    
        public String getField() {
            return this.field;
        }
    
        public void setField(String field) {
            this.field = field;
        }
    
        @Override
        public String toString() {
            return "Thing [field=" + this.field + "]";
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
    

    结果

    Thing [field=value1]
    2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
    Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
    2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
    Thing [field=value2]
    

    【讨论】:

    • 感谢@GaryRussell 的回复。如果还有其他异常,例如 TimeoutException,该怎么办。忘记重试,只需要记录任何可能发生的异常,所以你认为只有 SeekToCurrentErrorHandler 就足够了吗?虽然我知道异常被认为是不可重试的,所以现在必须考虑退避策略吗?
    • 您可以通过addNotRetryableException() 为 STCEH 指定其他不可重试的异常 - 对于此类异常,将跳过重试并在第一次失败时调用恢复器。如果您不想重试 任何 异常,请添加一个简单的日志记录错误处理程序。
    • 感谢您的意见。有没有办法在下一次投票中跳过这些记录,因为在 STCEH 中处理的任何内容都将在下一次投票中出现。你有简单的日志错误处理程序的例子吗?
    • 那么@Gary Russell,我认为如果我们不必重试,KafkaListenerErrorHandler 就足够了
    • 默认回退是 0 延迟,重试 9 次(总共 10 次传递尝试),但 DeserializationExceptions 不会重试(请参阅 addNotRetryableException() 以了解默认的不可重试异常)。但是,您可以添加 new FixedBackOff(0L, 0L) 以从不重试任何异常。
    【解决方案2】:

    我们期望在容器级别和侦听器级别记录我们可能遇到的任何异常。

    不重试,以下是我完成错误处理的方式:-

    如果我们在容器级别遇到任何异常,我们应该能够记录带有错误描述的消息负载并寻找该偏移量并跳过它并继续接收下一个偏移量。虽然只针对 DeserializationException 执行,但其余异常也需要查找,并且需要跳过偏移量。

    @Component
    public class KafkaContainerErrorHandler implements ErrorHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);
    
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
    
            // modify below logic according to your topic nomenclature
            String topics = s.substring(0, s.lastIndexOf('-'));
            int offset = Integer.parseInt(s.split("offset ")[1]);
            int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);
    
            logger.error("...")
            TopicPartition topicPartition = new TopicPartition(topics, partition);
            logger.info("Skipping {} - {} offset {}",  topics, partition, offset);
            consumer.seek(topicPartition, offset + 1);
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
    
        }
    }
    
    
     factory.setErrorHandler(kafkaContainerErrorHandler);
    

    如果我们在@KafkaListener 级别遇到任何异常,那么我将使用我的自定义错误处理程序配置我的侦听器并使用消息记录异常,如下所示:-

    @Bean("customErrorHandler")
        public KafkaListenerErrorHandler listenerErrorHandler() {
            return (m, e) -> {
                logger.error(...);
                return m;
            };
        }
    

    【讨论】:

    • 这不起作用 - 您将丢失列表中的剩余记录 - 您还需要在这些分区上执行搜索,以便在下一次轮询时重新获取它们。
    • 你指的是customErrorHandler,因为我正在寻找容器级别的分区并跳过那些发生异常的偏移量。
    • 是的;我只看到 1 次搜索(跳过反序列化异常);列表中的剩余记录(如果有)需要对剩余记录执行搜索(到每个分区的最低偏移量)。这正是 STCEH 所做的,所以不清楚为什么您需要自定义错误处理程序。
    • 我用 STCEH 尝试了 new SeekToCurrentErrorHandler((record, exception) -> { logger.error("..." ); }, 1);但它在无限循环中运行。我尝试将容器的并发也设置为 1,但仍然没有成功。当我不想有任何重试尝试时,我尝试将退避值设为 0,但对于任何异常,它仍然在无限循环中运行。这里有任何建议@Gary Russell
    • 我看到您缺少ErrorHandlingDeserializer - 我将在我的答案中添加一个示例。从异常中解析主题/分区可能有点脆弱。
    猜你喜欢
    • 2019-10-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-28
    • 1970-01-01
    • 2021-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多