【问题标题】:Kafka broker memory leak triggered by many consumers许多消费者触发的Kafka经纪人内存泄漏
【发布时间】:2020-02-05 19:07:37
【问题描述】:

我正在构建一个 Java 8 应用程序,该应用程序在 Kafka 主题中查询一条消息。每个请求都会创建一个新的 Consumer 对象(独立于任何现有的 Consumer 对象),它会轮询我的 Kafka 主题,获取一条记录,然后关闭 Consumer。这种情况每天发生约 20 万次,每个请求都独立于所有其他请求,所以我认为我不能重用消费者。基本上,用户从主题请求消息,并为他们创建消费者,然后关闭。这平均每秒发生约 2 次,但是是任意的,所以它可以发生 10 次/秒或 1 次/小时,没有办法知道。

一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得巨大,垃圾收集无法清除它。最终,用于 GC 的 CPU 时间比其他任何事情都多,并且在我重新启动 Kafka 之前一切都崩溃了。

这是导致问题的代码的近似版本,while(true) 近似真实行为(在生产中,消费者不是在 while 循环中创建的,而是在用户请求来自主题):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition("TOPIC", 0);
    consumer.assign(Arrays.asList(tp));
    consumer.seekToEnd(Arrays.asList(tp));

    // I've narrowed down the memory leak to this line
    ConsumerRecords<String, String> cr = consumer.poll(1000); 
    // If I remove this line ^, the memory leak does not happen

    /* CODE TO GET ONE RECORD */

    consumer.unsubscribe();
    consumer.close();
}

在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。这是 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)的样子:

我做错了什么(或者有更好的方法来解决这个问题),还是当创建和关闭大量消费者时,这是 Kafka 中的一个错误?

我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。

【问题讨论】:

  • 创建消费者的成本相对较高。你一遍又一遍地做。
  • @ftr 注意堆消耗在服务器上,只创建消费者不会造成泄漏。轮询部分似乎造成了泄漏。
  • 经纪人还为每个新消费者做家务。
  • @ftr 如果我删除进行轮询的行,内存泄漏就会消失,所以你是对的,但是如果没有轮询,经纪人会很好地处理家务(并且会产生费用)创造消费者)。所以这不是理想的解决方案,但我仍然认为当我现在使用它的方式使用时,Kafka 中存在一个错误。
  • 我认为每个消费者的 JMX 指标永远不会从消费者地图中删除/清除。随着时间的推移,这会造成泄漏。使用 JMX 检查消费者条目数量的命令: echo -e "open $(pgrep -f kafkaServer)\nbeans\n" | java -jar ~/Downloads/jmxterm-1.0-alpha-4-uber.jar | grep 客户端 ID=消费者 | wc -l ```

标签: java memory-leaks apache-kafka


【解决方案1】:

无论您收到的请求数量和频率如何,您仍然可以重用 KafkaConsumer 实例。您只能在请求到达时进行轮询,但您不需要每次都创建和关闭消费者。

话虽如此,如果内存使用量增加并且没有被 GC 回收,您对消费者的使用可能会暴露出代理上的内存管理问题。当生产者被频繁回收时,我已经看到报告代理用完直接内存的问题。因此,那里可能有改进的余地。可能最好在 issues.apache.org 上提出一张票来查看。

【讨论】:

    【解决方案2】:

    您每天要轮询 Kafka 约 20 万次,即每小时约 8k 次/每分钟约 140 次/每秒约两次 - 为什么每次都创建(并关闭)消费者的新实例?只需安排 KafkaConsumer 根据您所需的时间间隔触发(您可以为此使用 JDK ScheduledExecutorService)并重用相同的消费者实例

    【讨论】:

    • 我编辑了问题以更好地解释:在生产中,我无法控制请求何时发生(2/秒、100/秒、1/小时)。 while 循环仅用于复制行为和内存泄漏。我可能会尝试不同的方法,但我仍然认为 Kafka 在以我现在使用的方式使用时存在错误。
    • 老实说,无法评论“kafka bug”部分。我确实意识到 while 循环是用于模拟行为以及您无法控制生产设置的事实。但我仍然不明白的是'每个请求都独立于所有其他请求,所以我认为我不能重用消费者' - 这是问题的根源。
    【解决方案3】:

    Kafka 2.4.0(可能还有以前的版本)存在资源泄漏,其中某些 MBean 未在 Consumer.close() 上取消注册。

    这可能是您最初提出问题时的情况,当然 Tony 在上面的问题中建议这是 cmets 中的原因。

    https://issues.apache.org/jira/browse/KAFKA-9504?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22Consumer%20Leak%22

    【讨论】:

      猜你喜欢
      • 2021-10-13
      • 2016-06-02
      • 2020-11-13
      • 2018-04-15
      • 2013-10-01
      • 2016-01-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多