【发布时间】:2023-03-27 10:46:02
【问题描述】:
Kafka 消费者应用程序具有严重的延迟(在高峰时段没有足够快地消耗 kafka 事件)。 kafka topic有120个partition,consumer group一共有30个host,每个host有两个consumer,所以每个consumer从2个kafka partition消费。我们使用的主机是 32 核的 AWS C5.9xlarge 实例。每个消费者都被放入一个 java.lang.Thread 中,并且在每个线程中,创建一个包含 250 个线程的 ThreadPool。
我们已经验证了 CPU/内存/IO 都不是瓶颈。然后我们将 250 名工人增加到 500 名工人,但延迟仍然存在。然后我们又改回了 250 个 worker,但是每台主机从 2 个消费者增加到 4 个消费者。结果,每个消费者从一个 kafka 分区消费。现在问题解决了,延迟下降到非常低。
我的问题是,为什么线程池中从 250 个增加到 500 个没有帮助,但每个主机从 2 个增加到 4 个消费者有帮助?
private class ConsumerThread extends Thread {
public ConsumerThread(StremProcessor processor) {
this.processor = processor;
this.consumer = new KafkaConsumer()
}
@Override
public void run() {
ExecutorService executor = Executors.newFixedThreadPool(250);
while (true) {
Data data = consumer.poll()
executor.invokeAll(getTasks(data, processor)); //processor is
}
}
}
【问题讨论】:
标签: multithreading apache-kafka threadpool executorservice consumer