【发布时间】:2019-01-28 04:53:56
【问题描述】:
我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存。计划任务会定期清除缓存中的记录。我只想在批次成功保存在数据库中后更新 COMMIT OFFSET。我尝试将确认对象传递给缓存服务以调用确认方法,如下所示。
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
AckMode 设置如下:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
并且自动提交是假的:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?
我正在使用 spring-kafka 2.1.7.RELEASE。
编辑:在@GaryRussell confirmed 在下一次轮询期间由消费者线程执行外部线程作出的确认之后,我重新检查了我的代码,发现最后一个确认对象的错误放。 修复此问题后,提交偏移量按预期更新。所以这个问题已经解决了。但是,我无法将此问题标记为已回答。
【问题讨论】:
-
scheduled task是不同的线程吗?我相信这个lastAcknowledgment.acknowledge();应该在消费者线程上调用,你怎么定义偏移量没有提交? -
你的 spring-kafka 版本是什么?
-
是的,计划任务是一个不同的线程,因为我不想阻止消息的消费,而它们正在被持久化。使用命令行工具 kafka-consumer-groups.sh 可以看到 current-offset 没有更新。
-
它的 spring-kafka 2.1.7.RELEASE
-
你需要持有消费者线程
标签: java spring apache-kafka spring-kafka