【问题标题】:I am confused on principle of acknowledgment我对承认原则感到困惑
【发布时间】:2021-03-12 03:36:12
【问题描述】:

版本:2.1.11

问题: 我有一些错误的参数配置: 最大投票记录:500 max.poll.interval.ms:10000。 但消耗大约需要 25000 秒。所以它会抛出异常: org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

然后我更正了参数,这样这个错误就解决了。

但是,我很困惑,我在消费一条消息时已通过 ack.acknowledge() 提交。 为什么根本无法提交偏移量? 这是代码

@KafkaListener(topics = "${kafka.topic}")
public void consume(ConsumerRecord<String, SyncResMessage> record, Acknowledgment ack) {
    
    try {
        // consume message
        consumerService.dealResource(record.value());
    } catch (Exception e) {
        LOGGER.error("error when consume data, data key is {}, exception is {}.", record.key(), e);
    }

    if (ack != null) {
        LOGGER.info("commit successfully.");
        ack.acknowledge();
    } else {
        LOGGER.error("message of commit is null, record is {}.", JsonUtil.toString(record));
    }
}

任何帮助将不胜感激~

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    也许这就是答案。 MANUAL 表示不立即提交,它将更新此映射。 MANUAL_IMMEDIATE 表示立即提交。

    【讨论】:

    • 没错。使用 MANUAL,提交会在批处理完成后排队并应用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-11
    • 2021-09-01
    相关资源
    最近更新 更多