【问题标题】:Kafka consumer stops consuming messagesKafka 消费者停止消费消息
【发布时间】:2017-09-23 02:08:53
【问题描述】:

我有一个简单的 kafka 设置。生产者正在以高速率向具有单个主题的单个分区生成消息。单个消费者正在使用来自该分区的消息。在此过程中,消费者可能会多次暂停处理消息。暂停可以持续几分钟。生产者停止生产消息后,所有排队的消息都将由消费者处理。消费者似乎没有立即看到生产者生成的消息。我正在使用卡夫卡 0.10.1.0。这里会发生什么?这是使用消息的代码部分:

            while (true)
            {
                try
                {
                    ConsumerRecords<String, byte[]> records = consumer.poll(100);
                    for (final ConsumerRecord<String, byte[]> record : records)
                    {
                        serviceThread.submit(() ->
                        {
                            externalConsumer.accept(record);
                        });
                    }
                    consumer.commitAsync();
                } catch (org.apache.kafka.common.errors.WakeupException e)
                {
                }
            }

consumer 是一个 KafkaConsumer,禁用了自动提交,最大轮询记录为 100,会话超时为 30000。serviceThread 是一个 ExecutorService。

生产者只涉及 KafkaProducer.send 调用以发送 ProducerRecord。

broker 上的所有配置都保留为 kafka 默认值。

我还使用 kafka-consumer-groups.sh 来检查消费者不消费消息时发生的情况。但是当这种情况发生时,kafka-consumer-groups.sh 也会挂在那里,无法取回信息。有时它会触发消费者重新平衡。但并非总是如此。

【问题讨论】:

  • 您是否有机会为轮询超时设置一个更大的值并重试,而不是 100 毫秒?
  • 试了 1000 次,还是一样的行为。
  • “停止消费消息”是指KafkaConsumer#poll总是返回空吗?
  • 我知道这里发生了什么。 Kafka 代理可以在将消息写入磁盘之前对其进行累积。默认情况下,kafka 代理最多可以累积消息一分钟,而对累积的消息数量没有限制。在这些消息被刷新到磁盘之前,消费者无法访问这些消息。减少这些值后,它消除了消费者的停顿。
  • Kafka 保证的Client only sees committed messages. 的类似表达。

标签: java apache-kafka


【解决方案1】:

对于那些觉得这很有帮助的人。我经常遇到这个问题(当 kafka 静默停止消费时),而且每次它实际上都不是 Kafka 的问题。

通常是一些长时间运行或挂起的静默进程使 Kafka 无法提交偏移量。例如,试图连接到数据库的数据库客户端。如果您等待的时间足够长(例如,SQLAlchemy 和 Postgres 需要 15 分钟),您将看到一个异常将打印到 STDOUT,例如 connection timed out

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-11
    • 1970-01-01
    • 2022-10-23
    • 1970-01-01
    • 2017-11-09
    • 1970-01-01
    相关资源
    最近更新 更多