【发布时间】:2017-09-23 02:08:53
【问题描述】:
我有一个简单的 kafka 设置。生产者正在以高速率向具有单个主题的单个分区生成消息。单个消费者正在使用来自该分区的消息。在此过程中,消费者可能会多次暂停处理消息。暂停可以持续几分钟。生产者停止生产消息后,所有排队的消息都将由消费者处理。消费者似乎没有立即看到生产者生成的消息。我正在使用卡夫卡 0.10.1.0。这里会发生什么?这是使用消息的代码部分:
while (true)
{
try
{
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (final ConsumerRecord<String, byte[]> record : records)
{
serviceThread.submit(() ->
{
externalConsumer.accept(record);
});
}
consumer.commitAsync();
} catch (org.apache.kafka.common.errors.WakeupException e)
{
}
}
consumer 是一个 KafkaConsumer,禁用了自动提交,最大轮询记录为 100,会话超时为 30000。serviceThread 是一个 ExecutorService。
生产者只涉及 KafkaProducer.send 调用以发送 ProducerRecord。
broker 上的所有配置都保留为 kafka 默认值。
我还使用 kafka-consumer-groups.sh 来检查消费者不消费消息时发生的情况。但是当这种情况发生时,kafka-consumer-groups.sh 也会挂在那里,无法取回信息。有时它会触发消费者重新平衡。但并非总是如此。
【问题讨论】:
-
您是否有机会为轮询超时设置一个更大的值并重试,而不是 100 毫秒?
-
试了 1000 次,还是一样的行为。
-
“停止消费消息”是指KafkaConsumer#poll总是返回空吗?
-
我知道这里发生了什么。 Kafka 代理可以在将消息写入磁盘之前对其进行累积。默认情况下,kafka 代理最多可以累积消息一分钟,而对累积的消息数量没有限制。在这些消息被刷新到磁盘之前,消费者无法访问这些消息。减少这些值后,它消除了消费者的停顿。
-
Kafka 保证的
Client only sees committed messages.的类似表达。
标签: java apache-kafka