【问题标题】:Spring Kafka - "ErrorHandler threw an exception" and lost some recordsSpring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录
【发布时间】:2022-01-18 15:05:50
【问题描述】:

Consumer一次轮询2条记录,即:

    @Bean
    ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = Map.of(
                BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                GROUP_ID_CONFIG, "my-consumers",
                AUTO_OFFSET_RESET_CONFIG, "earliest",
                MAX_POLL_RECORDS_CONFIG, 2);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    }

ErrorHandler 可能无法处理错误记录:

class MyListenerErrorHandler implements ContainerAwareErrorHandler {

    @Override
    public void handle(Exception thrownException,
                       List<ConsumerRecord<?, ?>> records,
                       Consumer<?, ?> consumer,
                       MessageListenerContainer container) {
        simulateBugInErrorHandling(records.get(0));
        skipFailedRecord(); // seek offset+1, which never happens
    }

    private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) {
        throw new NullPointerException(
                "DB transaction failed when saving info about failure on offset = " + record.offset());
    }
}

那么这样的场景是可能的:

  1. 主题获得 3 条记录
  2. Consumer 一次轮询 2 条记录
  3. MessageListener 由于负载错误,无法处理第一条记录
  4. ErrorHandler 无法处理失败并且自身抛出异常,例如由于一些临时问题
  5. 第三条记录得到处理
  6. 从不处理第二条记录(从不输入MessageListener

ErrorHandler在上述情况下抛出异常时,如何确保没有未处理的记录?

我的目标是实现有延迟的有状态重试逻辑,但为简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。


我希望在ErrorHandler 引发异常之后,不应该跳过整批记录。但确实如此。

  1. 这是正确的行为吗?
  2. 我是否应该手动处理使用 Spring/Kafka 默认值的提交?
  3. 我应该使用不同的ErrorHandlerhandle 方法吗? (我需要访问Container 来创建pause() 用于延迟重试逻辑;不能使用Thread.sleep()

不知何故相关问题:https://github.com/spring-projects/spring-kafka/issues/1265

完整代码:https://github.com/ptomaszek/spring-kafka-error-handler

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    消费者必须重新定位(使用搜索),以便在失败后重新获取记录。

    使用DefaultErrorHandler(2.8.x 及更高版本)或SeekToCurrentErrorHandler 与早期版本。

    您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是被记录。

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

    https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current

    在抛出任何异常之前,您需要先进行搜索(或在 finally 块中);如果错误处理程序抛出异常,容器不会提交偏移量。

    Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行了搜索。

    默认错误处理程序会捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()

    【讨论】:

    • 我需要一个自定义的ErrorHandler。但是,它可以在使用seek 重新定位之前引发异常。然后似乎与records 的最新记录的偏移量无论如何都会与下一个poll 一起提交。最后,失败的记录(以及他第一包 records 中的剩余记录)永远不会被重新处理
    • 您需要先进行搜索(或在 finally 块中),然后才能引发任何异常;它不提交偏移量; Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行搜索。默认错误处理程序捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()
    猜你喜欢
    • 2021-12-02
    • 2020-12-14
    • 2020-09-27
    • 2017-11-30
    • 2018-06-22
    • 2021-04-23
    • 2017-05-17
    • 1970-01-01
    • 2022-12-16
    相关资源
    最近更新 更多