【问题标题】:Update Kafka commit offset after successful batch insert批量插入成功后更新 Kafka 提交偏移量
【发布时间】:2019-01-28 04:53:56
【问题描述】:

我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存。计划任务会定期清除缓存中的记录。我只想在批次成功保存在数据库中后更新 COMMIT OFFSET。我尝试将确认对象传递给缓存服务以调用确认方法,如下所示。

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

AckMode 设置如下:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

并且自动提交是假的:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

即使调用了确认方法,提交偏移量也不会更新。持久化记录后更新提交偏移量的最佳方法是什么?

我正在使用 spring-kafka 2.1.7.RELEASE。


编辑:在@GaryRussell confirmed 在下一次轮询期间由消费者线程执行外部线程作出的确认之后,我重新检查了我的代码,发现最后一个确认对象的错误放。 修复此问题后,提交偏移量按预期更新。所以这个问题已经解决了。但是,我无法将此问题标记为已回答。

【问题讨论】:

  • scheduled task 是不同的线程吗?我相信这个lastAcknowledgment.acknowledge();应该在消费者线程上调用,你怎么定义偏移量没有提交?
  • 你的 spring-kafka 版本是什么?
  • 是的,计划任务是一个不同的线程,因为我不想阻止消息的消费,而它们正在被持久化。使用命令行工具 kafka-consumer-groups.sh 可以看到 current-offset 没有更新。
  • 它的 spring-kafka 2.1.7.RELEASE
  • 你需要持有消费者线程

标签: java spring apache-kafka spring-kafka


【解决方案1】:

问题来了,消费者线程负责提交偏移量。在 poll 的时候消费者线程会提交上一个批次的偏移量。

因为在你的情况下AUTO_COMMIT 是假的,lastAcknowledgment.acknowledge() 是不承认偏移量是不提交。

只有一种方法可以做到这一点,一旦您获得轮询记录,将Schedule 任务设为Async 并持有消费者线程并在异步任务完成后提交偏移量,请查看此答案以供参考answer

注意如果您持有消费者线程超过 5 分钟,则会发生重新平衡here

新的 Java Consumer 现在支持来自后台线程的心跳。有一个新的配置 max.poll.interval.ms 控制在消费者主动离开组之前轮询调用之间的最长时间(默认为 5 分钟)。配置 request.timeout.ms 的值必须始终大于max.poll.interval.ms,因为这是消费者重新平衡时 JoinGroup 请求可以在服务器上阻塞的最长时间,因此我们将其默认值更改为略高于 5分钟。最后将session.timeout.ms的默认值调整为10秒,max.poll.records的默认值改为500。

特别说明来自 spring kafka >2.1.5

在下一次投票之前,消费者线程将执行对外部线程的确认感谢@Gary Russell 提供此信息

【讨论】:

  • 对“外部”线程的确认将由消费者线程在下一次 poll() 之前执行;见this commit
  • 但在这里我相信Ack 不会在下一个poll 之前发生,只要消费者线程空闲,它就会轮询下一个偏移量,对吧? @GaryRussell,这个提交来自2.1.5?
  • 没错;实际发生异步偏移提交的时间取决于提交 ack 时消费者线程的状态。如果它“卡在”poll() 中(并且没有更多记录可用),则提交将在轮询退出(超时)并且就在下一次轮询之前发生,如果轮询返回更多记录,则提交获胜'直到这些记录全部发送给侦听器后才会发生。 processCommits()poll() 之前被调用,并提交任何未决的偏移量(从异步线程提交的所有偏移量都添加到未决队列中)。是的,它是在 2.1.5 中添加的。
  • 我们别无选择——消费者不是线程安全的。为了让监听器拥有完全的控制权,确认必须在消费者线程上执行(然后使用 MANUAL_IMMEDIATE 以便立即发生提交)。
  • @GaryRussell - 非常感谢您确认外部线程做出的确认仍然由消费者线程执行。在此之后,我检查了我的代码并发现了如何设置最后一个确认的错误。修复此问题后,提交偏移量将如您所述更新。
猜你喜欢
  • 1970-01-01
  • 2017-03-17
  • 2018-06-08
  • 1970-01-01
  • 2021-11-14
  • 2020-06-04
  • 1970-01-01
  • 2022-06-11
  • 2020-09-06
相关资源
最近更新 更多