【发布时间】:2020-02-05 19:07:37
【问题描述】:
我正在构建一个 Java 8 应用程序,该应用程序在 Kafka 主题中查询一条消息。每个请求都会创建一个新的 Consumer 对象(独立于任何现有的 Consumer 对象),它会轮询我的 Kafka 主题,获取一条记录,然后关闭 Consumer。这种情况每天发生约 20 万次,每个请求都独立于所有其他请求,所以我认为我不能重用消费者。基本上,用户从主题请求消息,并为他们创建消费者,然后关闭。这平均每秒发生约 2 次,但是是任意的,所以它可以发生 10 次/秒或 1 次/小时,没有办法知道。
一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得巨大,垃圾收集无法清除它。最终,用于 GC 的 CPU 时间比其他任何事情都多,并且在我重新启动 Kafka 之前一切都崩溃了。
这是导致问题的代码的近似版本,while(true) 近似真实行为(在生产中,消费者不是在 while 循环中创建的,而是在用户请求来自主题):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen
/* CODE TO GET ONE RECORD */
consumer.unsubscribe();
consumer.close();
}
在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。这是 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)的样子:
我做错了什么(或者有更好的方法来解决这个问题),还是当创建和关闭大量消费者时,这是 Kafka 中的一个错误?
我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。
【问题讨论】:
-
创建消费者的成本相对较高。你一遍又一遍地做。
-
@ftr 注意堆消耗在服务器上,只创建消费者不会造成泄漏。轮询部分似乎造成了泄漏。
-
经纪人还为每个新消费者做家务。
-
@ftr 如果我删除进行轮询的行,内存泄漏就会消失,所以你是对的,但是如果没有轮询,经纪人会很好地处理家务(并且会产生费用)创造消费者)。所以这不是理想的解决方案,但我仍然认为当我现在使用它的方式使用时,Kafka 中存在一个错误。
-
我认为每个消费者的 JMX 指标永远不会从消费者地图中删除/清除。随着时间的推移,这会造成泄漏。使用 JMX 检查消费者条目数量的命令: echo -e "open $(pgrep -f kafkaServer)\nbeans\n" | java -jar ~/Downloads/jmxterm-1.0-alpha-4-uber.jar | grep 客户端 ID=消费者 | wc -l ```
标签: java memory-leaks apache-kafka