【发布时间】:2020-01-31 13:20:05
【问题描述】:
我正在尝试模拟 Kafka 抛出的 CommitFailedException。
我手动将“session.timeout.ms”设置为 10000 毫秒,将“enable.auto.commit”设置为 false。
在 Kafkaconsumer.poll() 之后,我得到了语句 Thread.sleep(12000),之后我进行了提交。我希望由于线程在下一次轮询之前需要 12 秒,因此应该将使用者标记为已死亡并且应该抛出 CommitFailedException。但是,该过程执行顺利。
如何模拟 KafkaConsumer 抛出的异常。
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
Thread.sleep(12000);
}catch (Exception e){
e.printStackTrace();
}
consumer.commitSync();
}
【问题讨论】:
-
线程睡眠时间超过轮询间隔会导致消费者组重新平衡,而不是提交失败
-
我在同一个主题上创建了两个消费者。那么当重新平衡发生时,其他消费者应该已经收到了分区,当现有消费者提交时,不会抛出异常吗?
-
我不这么认为。我相信它是默默处理的。不过,您可以设置
ConsumerRebalanceListener -
您可能还想扫描 Kafka 源代码,看看是否可以找到围绕 CommitFailedException 的单元测试
标签: apache-kafka kafka-consumer-api