【发布时间】:2018-01-15 13:08:28
【问题描述】:
我使用的是 kafka 0.10.2,现在遇到了 CommitFailedException。喜欢:
提交无法完成,因为该组已经重新平衡并且 将分区分配给另一个成员。这意味着时间 对 poll() 的后续调用之间的时间比配置的长 max.poll.interval.ms,这通常意味着轮询循环是 花费太多时间处理消息。您可以解决这个问题 通过增加会话超时或减少最大大小 使用 max.poll.records 在 poll() 中返回的批次。
我已将 max.poll.interval.ms 设置为 Integer.MAX_VALUE。那么任何人都可以告诉我为什么即使我设置了值仍然会发生这种情况?
另一个问题是: 我按照描述将 session.timeout.ms 设置为 60000,它仍然会发生。我尝试通过一个简单的代码来重现
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
当我将 session.timeout.ms 设置为 10000 时,我尝试在轮询循环中睡眠超过 10000 毫秒,但它似乎工作并且没有异常输出。所以我对此感到困惑。如果心跳是由 consumer.poll 和 consumer.commit 触发的,那么在我的代码中,心跳似乎超出了会话超时。为什么不抛出 CommitFailedException ?
【问题讨论】:
-
虽然用户线程挂了10多秒,但是心跳线程仍然可以正常发出心跳,所以没有抛出异常,这也是引入
max.poll.intervals.ms的原因。我感兴趣的是为什么当max.poll.intervals.ms设置为 Integer.MAX_VALUE 时您仍然会收到 CommitFailException。 -
这是我的问题,我也很困惑。 ..
-
能否轻松复制或仅发生一次?
标签: java apache-kafka