【发布时间】:2021-03-12 03:36:12
【问题描述】:
版本:2.1.11
问题: 我有一些错误的参数配置: 最大投票记录:500 max.poll.interval.ms:10000。 但消耗大约需要 25000 秒。所以它会抛出异常: org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。
然后我更正了参数,这样这个错误就解决了。
但是,我很困惑,我在消费一条消息时已通过 ack.acknowledge() 提交。 为什么根本无法提交偏移量? 这是代码
@KafkaListener(topics = "${kafka.topic}")
public void consume(ConsumerRecord<String, SyncResMessage> record, Acknowledgment ack) {
try {
// consume message
consumerService.dealResource(record.value());
} catch (Exception e) {
LOGGER.error("error when consume data, data key is {}, exception is {}.", record.key(), e);
}
if (ack != null) {
LOGGER.info("commit successfully.");
ack.acknowledge();
} else {
LOGGER.error("message of commit is null, record is {}.", JsonUtil.toString(record));
}
}
任何帮助将不胜感激~
【问题讨论】:
标签: java apache-kafka spring-kafka