【问题标题】:CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another memberCommitFailedException 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员
【发布时间】:2018-01-15 13:08:28
【问题描述】:

我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException。喜欢:

提交无法完成,因为该组已经重新平衡并且 将分区分配给另一个成员。这意味着时间 对 poll() 的后续调用之间的时间比配置的长 max.poll.interval.ms,这通常意味着轮询循环是 花费太多时间处理消息。您可以解决这个问题 通过增加会话超时或减少最大大小 使用 max.poll.records 在 poll() 中返回的批次。

我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE。那么任何人都可以告诉我为什么即使我设置了值仍然会发生这种情况?

另一个问题是: 我按照描述将 session.timeout.ms 设置为 60000,它仍然会发生。我尝试通过一个简单的代码来重现

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

当我将 session.timeout.ms 设置为 10000 时,我尝试在轮询循环中睡眠超过 10000 毫秒,但它似乎工作并且没有异常输出。所以我对此感到困惑。如果心跳是由 consumer.poll 和 consumer.commit 触发的,那么在我的代码中,心跳似乎超出了会话超时。为什么不抛出 CommitFailedException ?

【问题讨论】:

  • 虽然用户线程挂了10多秒,但是心跳线程仍然可以正常发出心跳,所以没有抛出异常,这也是引入max.poll.intervals.ms的原因。我感兴趣的是为什么当 max.poll.intervals.ms 设置为 Integer.MAX_VALUE 时您仍然会收到 CommitFailException。
  • 这是我的问题,我也很困惑。 ..
  • 能否轻松复制或仅发生一次?

标签: java apache-kafka


【解决方案1】:

在消费者上设置的session.timeout.ms 应该小于在 Kafka 代理上设置的group.max.session.timeout.ms

这为我解决了这个问题。

感谢github链接Commit Failures

【讨论】:

【解决方案2】:

您好为此,您需要处理代码中的重新平衡条件,并且应该在重新平衡之前处理正在进行的消息并提交它

喜欢:

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method
    }
}

并使用此语法订阅主题:

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

这样做的好处:

  1. 进行再平衡时消息不会重复。

  2. 没有提交失败异常

【讨论】:

    猜你喜欢
    • 2018-01-17
    • 2018-07-04
    • 2017-02-06
    • 2019-01-08
    • 1970-01-01
    • 2013-05-18
    • 2017-01-12
    • 2017-10-29
    • 2018-04-25
    相关资源
    最近更新 更多