【发布时间】:2022-01-18 15:05:50
【问题描述】:
让Consumer一次轮询2条记录,即:
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = Map.of(
BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
GROUP_ID_CONFIG, "my-consumers",
AUTO_OFFSET_RESET_CONFIG, "earliest",
MAX_POLL_RECORDS_CONFIG, 2);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
和ErrorHandler 可能无法处理错误记录:
class MyListenerErrorHandler implements ContainerAwareErrorHandler {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
simulateBugInErrorHandling(records.get(0));
skipFailedRecord(); // seek offset+1, which never happens
}
private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) {
throw new NullPointerException(
"DB transaction failed when saving info about failure on offset = " + record.offset());
}
}
那么这样的场景是可能的:
- 主题获得 3 条记录
-
Consumer一次轮询 2 条记录 -
MessageListener由于负载错误,无法处理第一条记录 -
ErrorHandler无法处理失败并且自身抛出异常,例如由于一些临时问题 - 第三条记录得到处理
- 从不处理第二条记录(从不输入
MessageListener)
当ErrorHandler在上述情况下抛出异常时,如何确保没有未处理的记录?
我的目标是实现有延迟的有状态重试逻辑,但为简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。
我希望在ErrorHandler 引发异常之后,不应该跳过整批记录。但确实如此。
- 这是正确的行为吗?
- 我是否应该手动处理使用 Spring/Kafka 默认值的提交?
- 我应该使用不同的
ErrorHandler或handle方法吗? (我需要访问Container来创建pause()用于延迟重试逻辑;不能使用Thread.sleep())
不知何故相关问题:https://github.com/spring-projects/spring-kafka/issues/1265
完整代码:https://github.com/ptomaszek/spring-kafka-error-handler
【问题讨论】:
标签: spring-kafka