【发布时间】:2021-02-18 20:46:15
【问题描述】:
我有一个 Spring Boot 应用程序,我正在尝试创建一个 Kafka 重试侦听器,其中根据异常类型我需要更改重试尝试。
例如:
如果异常类型为 A,则重试次数应为 5
如果异常类型为 B,则重试次数应为 10
谁能推荐在 Spring Kafka 中如何做到这一点?
以下是我的 ListenerFactory。我正在使用 SeekToCurrentErrorHandler
Bean
public ConcurrentKafkaListenerContainerFactory<String, test> retryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, test> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
System.out.println(
"RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
}, new FixedBackOff(0L, 3L));
errorHandler.addNotRetryableException(IllegalArgumentException.class);
errorHandler.setCommitRecovered(true);
factory.setErrorHandler(errorHandler);
// to keep the consumers alive the failure gets reported to broker so that
// consumers remain alive
factory.setStatefulRetry(true);
factory.setConcurrency(2);
return factory;
}
【问题讨论】:
标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka