【问题标题】:How to set change the retry attempts based of the exception type in Kafka SeekToCurrentErrorHandler?如何根据 Kafka SeekToCurrentErrorHandler 中的异常类型设置更改重试次数?
【发布时间】:2021-02-18 20:46:15
【问题描述】:

我有一个 Spring Boot 应用程序,我正在尝试创建一个 Kafka 重试侦听器,其中根据异常类型我需要更改重试尝试。

例如:

如果异常类型为 A,则重试次数应为 5
如果异常类型为 B,则重试次数应为 10

谁能推荐在 Spring Kafka 中如何做到这一点?

以下是我的 ListenerFactory。我正在使用 SeekToCurrentErrorHandler

Bean
    public ConcurrentKafkaListenerContainerFactory<String, test> retryListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, test> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println(
                    "RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
        }, new FixedBackOff(0L, 3L));
        errorHandler.addNotRetryableException(IllegalArgumentException.class);
        errorHandler.setCommitRecovered(true);
        factory.setErrorHandler(errorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }

【问题讨论】:

    标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    从 2.6 版开始,您可以添加一个函数来根据消费者记录和/或异常情况确定要使用的 BackOff

    /**
     * Set a function to dynamically determine the {@link BackOff} to use, based on the
     * consumer record and/or exception. If null is returned, the default BackOff will be
     * used.
     * @param backOffFunction the function.
     * @since 2.6
     */
    public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
        this.failureTracker.setBackOffFunction(backOffFunction);
    }
    

    仅当此记录没有当前的BackOff 时才会调用它(即,它对您的other question 没有帮助)。

    【讨论】:

    • 我打开了一个问题,允许在异常类型更改时重置故障跟踪器:github.com/spring-projects/spring-kafka/issues/1610
    • 非常感谢@GaryRussell!我还有一个问题,有没有办法可以设置否。 ExpoentialBackOff 的重试次数?我的要求是更改号码。基于异常类型的重试次数,并且还会导致每次重试之间呈指数级延迟。你能建议一下吗?现在我正在这样做。 errorHandler.setBackOffFunction((record,exception)-> { if(msg.getNewError().equals(errorCodes.getApiError()) return new FixedBackOff(0L, apiRetryCount); else return new FixedBackOff(0L, retryCount ); });
    • 您正在使用FixedBackOff,重试之间没有延迟。请改用ExpontialBackOff - new ExponentialBackOff(initial, multiplier)。您还可以设置最大间隔以增加尝试之间的延迟上限,并设置setMaxElapsedTime 以停止重试(而不是重试计数)(您可以根据重试计数和其他设置来计算)。带有新功能的2.6.3将于下周发布,
    • 谢谢。还有一个问题。在所有重试都用尽的恢复中,我需要将消息放入错误主题。我可以为此目的使用 DealLetterPublishingRecoverer 吗?
    • 是的。这就是它的用途。
    猜你喜欢
    • 1970-01-01
    • 2021-08-23
    • 2021-04-12
    • 2014-11-18
    • 2021-02-18
    • 1970-01-01
    • 2019-09-29
    • 1970-01-01
    • 2017-10-24
    相关资源
    最近更新 更多