【问题标题】:Kafka-node suddenly consumes from offset 0Kafka-node 突然从偏移量 0 开始消费
【发布时间】:2020-05-17 05:44:44
【问题描述】:

有时,kafka-node 消费者从偏移量 0 开始消费,而它的默认行为是只消费较新的消息。然后它不会切换回其默认行为。你知道如何解决这个问题,会发生什么以及它的行为突然改变吗?代码非常简单,无需更改代码即可实现。

var kafka = require("kafka-node") ;
  Consumer = kafka.Consumer;
  client = new kafka.KafkaClient();


  consumer = new Consumer(client, [{ topic: "Topic_23", partition: 0}
                                    ]);


consumer.on("message", function(message) {

    console.log(message)


  });

目前我找到的唯一解决方案是更改 kafka 主题。然后一切正常。有任何想法吗 ?

【问题讨论】:

  • 您能分享一下您的 Kafka Consumer 的选项吗?
  • 乔治,没有。只有我提供的默认设置。

标签: node.js apache-kafka kafka-consumer-api node-kafka


【解决方案1】:

在 Kafka 中,偏移量不与特定消费者相关联,而是与消费者组相关联。在您的代码中,您没有提供消费者组,因此,每次启动消费者时,它都会被分配给不同的消费者组,因此偏移量从0 开始。

以下内容应该可以解决问题(显然,在您第一次阅读所有消息时):

var kafka = require("kafka-node") ;

Consumer = kafka.Consumer;
client = new kafka.KafkaClient();

payload = [{
    topic: "Topic_23", 
    partition: 0
}]

var options = {
    groupId: 'test-consumer-group',
    fromOffset: 'latest'
};


consumer = new Consumer(client, payload, options);
consumer.on("message", function(message) {
    console.log(message)
  });

【讨论】:

  • 我需要将 [] 添加到有效负载消息中,否则会抛出:TypeError: topics.some is not a function。如果我添加它,那么它会再次从头开始读取所有消息。不幸的是,行为没有改变。
  • 你运行了两次吗?
  • 另外,你能分享./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list yourHost:9092 --topic Topic_23 的输出吗?
猜你喜欢
  • 2016-10-23
  • 2019-01-18
  • 2019-04-11
  • 2016-11-20
  • 1970-01-01
  • 2016-02-14
  • 2017-12-13
  • 1970-01-01
  • 2018-06-28
相关资源
最近更新 更多