【问题标题】:kafka-node consumer receives offsetOutOfRange errorkafka-node 消费者收到 offsetOutOfRange 错误
【发布时间】:2015-11-18 10:11:16
【问题描述】:

我正在使用 kafka-node(kafka 的节点客户端),使用消费者来检索有关主题的消息。不幸的是,我收到了“offsetOutOfRange”条件(调用了 offsetOutOfRange 回调)。我的应用程序运行良好,直到消费者明显落后于生产者,在最早的偏移量和最新的偏移量之间留下了相当大的差距。在这一点上,我(可能是错误的)假设消费者将能够继续接收消息(并希望赶上生产者)。

我的kafka消费者客户端代码如下:

:
:
var kafka = require('kafka-node');

var zookeeper = "10.0.1.201:2181";
var id = "embClient";

var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );

consumer.on('error', [error callback...]);

consumer.on('offsetOutOfRange', [offset error callback...]);

consumer.on('message', [message callback...]);
:
:

我做错了什么,还是错过了什么?

如果没有,我有几个问题:

(a) 是否有一种公认的“最佳”方式来编写客户端以优雅地处理这种情况?

(b) 为什么会提出这个条件? (我假设客户端应该能够继续阅读它停止的消息,最终(理想情况下)赶上......)

(c) 我是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移以读取? (这似乎有点麻烦)...

感谢任何帮助。

【问题讨论】:

    标签: node.js apache-kafka kafka-consumer-api


    【解决方案1】:

    我认为该应用可能会尝试读取 Kafka 中不再可用的消息。 Kafka 根据 log.retention.* 属性删除旧消息。假设您向 Kafka 发送了 1000 条消息。由于保留,Kafka 删除了前 500 条消息。如果您的应用程序尝试读取消息 350,它将失败并引发 offsetOutOfRange 错误。这可能是因为您的消费者速度太慢,以至于 Kafka 代理已经在您的消费者处理消息之前删除了消息。或者您的消费者崩溃了,但最后处理的消息的偏移量保存在某处。

    您可以使用Offset class 检索最新/最早的可用偏移量(参见方法fetch)并更新消费者的偏移量。我们使用这种方法。

    一般来说,当这种情况发生时,很难判断应用程序应该做什么,因为很明显有些地方很不对劲。

    希望对你有帮助, 卢卡什

    【讨论】:

    • 根据您的回答进行了一番调查后,我发现情况确实如此——Kafka已经删除了消息(这是一个测试系统,所以没有真正的问题)。谢谢。
    • 您介意分享一下您是如何处理这个问题的。您是否明确地将偏移量设置为其他值?
    • 是的,我们通常根据当前要求将其明确设置为最旧或最新的偏移量。
    • 最新版本中的ConsumerGroup 现在可以从这个问题中正常恢复。提供outOfRangeOffset 选项以从earliestlatest 可用偏移中读取。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-06
    • 2016-05-15
    • 1970-01-01
    • 2020-01-10
    • 2020-04-12
    • 2020-10-16
    • 2018-09-12
    相关资源
    最近更新 更多