【问题标题】:RetryingBatchErrorHandler - Offset commit handlingRetryingBatchErrorHandler - 偏移提交处理
【发布时间】:2021-10-13 00:25:10
【问题描述】:

我正在使用 spring-kafka 2.3.8,我正在尝试记录恢复的记录并使用 RetryingBatchErrorHandler 提交偏移量。您将如何在恢复器中提交偏移量?

public class Customizer implements ContainerCustomizer{
    private static ConsumerRecordRecoverer createConsumerRecordRecoverer() {
        return (consumerRecord, e) -> {
            log.info("Number of attempts exhausted. parition: " consumerRecord.partition() + ", offset: " + consumerRecord.offset());
         # need to commit the offset
         };
    }

    @Override
    public void configure(AbstractMessageListenerContainer container) {
        container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 3L), createConsumerRecordRecoverer()));
    }

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    如果错误处理程序“处理”异常,容器将自动提交偏移量,除非您将 ackAfterHandle 属性设置为 false(默认为 true)。

    编辑

    这对我来说按预期工作:

    @SpringBootApplication
    public class So69534923Application {
    
        private static final Logger log = LoggerFactory.getLogger(So69534923Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So69534923Application.class, args);
        }
    
        @KafkaListener(id = "so69534923", topics = "so69534923")
        void listen(List<String> in) {
            System.out.println(in);
            throw new RuntimeException("test");
        }
    
        @Bean
        RetryingBatchErrorHandler eh() {
            return new RetryingBatchErrorHandler(new FixedBackOff(1000L, 2), (rec, ex) -> {
                this.log.info("Retries exchausted for " + ListenerUtils.recordToString(rec, true));
            });
        }
    
        @Bean
        ApplicationRunner runner(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                KafkaTemplate<String, String> template) {
    
            factory.getContainerProperties().setCommitLogLevel(Level.INFO);
            return args -> {
                template.send("so69534923", "foo");
                template.send("so69534923", "bar");
            };
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.listener.type=batch
    
    so69534923: partitions assigned: [so69534923-0]
    [foo, bar]
    [foo, bar]
    [foo, bar]
    Retries exchausted for so69534923-0@2
    Retries exchausted for so69534923-0@3
    Committing: {so69534923-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}
    

    日志来自第二次运行。

    EDIT2

    它不适用于 2.3.x;您应该升级到受支持的版本。

    https://spring.io/projects/spring-kafka#learn

    【讨论】:

    • 我想在恢复器中记录记录与处理异常不一样?我不需要抛出异常来调用恢复器吗? (P.S. ackAfterHandle 没有设置为 false。)
    • 通过“句柄”,我的意思是如果错误处理程序本身没有抛出异常,则认为它已被处理并提交了偏移量。见github.com/spring-projects/spring-kafka/blob/…github.com/spring-projects/spring-kafka/blob/…
    • &gt;Don't I need to throw an exception in order to invoke the recoverer? ?当重试用尽时调用恢复器。
    • 但我不能“吃掉”我的记录处理器中的异常。我需要把它们扔到容器里。对吗?
    • 只有当监听器抛出异常时才会调用错误处理程序。该批次根据重试配置重新提交给监听器;当重试用尽时,调用恢复器并退出错误处理程序,容器提交偏移量。
    猜你喜欢
    • 2020-12-28
    • 2017-03-17
    • 1970-01-01
    • 2018-06-08
    • 1970-01-01
    • 1970-01-01
    • 2017-12-10
    • 2018-12-17
    • 1970-01-01
    相关资源
    最近更新 更多