【发布时间】:2020-11-23 22:16:12
【问题描述】:
我有一个配置了 spring-kafka 的 Springboot 应用程序,我想在其中处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,则将重试 2 次,然后应将消息记录到错误文件中。我有两种可以遵循的方法:-
第一种方法(将 SeekToCurrentErrorHandler 与 DeadLetterPublishingRecoverer 一起使用):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
但为此我们需要添加主题(一个新的 .DLT 主题),然后我们可以将其记录到文件中。
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
方法 2(使用自定义 SeekToCurrentErrorHandler):-
@Bean
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = 2;
CustomSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
}
}
}
任何人都可以就实现此类功能的标准方法提供建议。在第一种方法中,我们确实看到了创建 .DLT 主题和额外的 @KafkaListener 的开销。在第二种方法中,我们可以直接记录我们的消费者记录异常。
【问题讨论】:
标签: java spring-boot kafka-consumer-api spring-kafka