【问题标题】:Simulate Kafka CommitFailedException模拟 Kafka CommitFailedException
【发布时间】:2020-01-31 13:20:05
【问题描述】:

我正在尝试模拟 Kafka 抛出的 CommitFailedException。

我手动将“session.timeout.ms”设置为 10000 毫秒,将“enable.auto.commit”设置为 false。

在 Kafkaconsumer.poll() 之后,我得到了语句 Thread.sleep(12000),之后我进行了提交。我希望由于线程在下一次轮询之前需要 12 秒,因此应该将使用者标记为已死亡并且应该抛出 CommitFailedException。但是,该过程执行顺利。

如何模拟 KafkaConsumer 抛出的异常。

consumer.subscribe(Arrays.asList("foo"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }

            try {
                Thread.sleep(12000);
            }catch (Exception e){
                e.printStackTrace();
            }
            consumer.commitSync();
        }

【问题讨论】:

  • 线程睡眠时间超过轮询间隔会导致消费者组重新平衡,而不是提交失败
  • 我在同一个主题上创建了两个消费者。那么当重新平衡发生时,其他消费者应该已经收到了分区,当现有消费者提交时,不会抛出异常吗?
  • 我不这么认为。我相信它是默默处理的。不过,您可以设置ConsumerRebalanceListener
  • 您可能还想扫描 Kafka 源代码,看看是否可以找到围绕 CommitFailedException 的单元测试

标签: apache-kafka kafka-consumer-api


【解决方案1】:

Kafka 通过单独的线程使用心跳机制来检查消费者的健康状况。消费者心跳线程必须在session.timeout.ms时间到期之前向代理发送心跳。

heartbeat.interval.ms: 心跳到 使用 Kafka 的组管理设施时的消费者协调员。 心跳用于确保消费者的会话保持活动状态 并在新消费者加入或离开时促进再平衡 组。

session.timeout.ms: 用于检测客户端故障的超时时间 使用 Kafka 的组管理工具。客户端定期发送 心跳以向代理指示其活跃性。如果没有心跳 在此会话到期之前由经纪人收到 超时,则代理将从组中删除该客户端,并 启动重新平衡。

检查消费者活跃度的另一种机制是轮询。消费者应该 poll() 而不会过期 max.poll.interval.ms。如果这个时间到期(通常长时间运行的进程会导致这个问题),消费者再次被认为是死的。

ma​​x.poll.interval.ms: poll() 调用之间的最大延迟 使用消费者组管理时。这设置了一个上限 消费者在获取更多信息之前可以空闲的时间量 记录。如果在此超时到期之前未调用 poll(), 然后消费者被认为是失败的,该组将重新平衡 为了将分区重新分配给另一个成员。

如果由于session.timeout.ms 中没有心跳或max.poll.interval.ms 中没有轮询,Kafka 认为消费者已死亡,则消费者无法提交消息并获得CommitFailedException

CommitFailedException: 当使用 KafkaConsumer.commitSync() 的偏移提交失败并出现不可恢复的情况时,会引发此异常 错误。当组重新平衡在 commit 可以成功应用。在这种情况下,提交不能 通常会重试,因为某些分区可能已经 已分配给组中的另一个成员。

因此;因为心跳线程是一个单独的线程,所以代码中的睡眠不会影响它。但在您的情况下,您可以将 max.poll.interval.ms 设置为 10 秒以获取 CommitFailedException

【讨论】:

    猜你喜欢
    • 2020-07-10
    • 1970-01-01
    • 2016-06-10
    • 2022-01-11
    • 2021-10-23
    • 1970-01-01
    • 2021-10-17
    • 1970-01-01
    • 2020-04-06
    相关资源
    最近更新 更多