【问题标题】:Kafka Node - How to retrieve all messages on a compacted topicKafka 节点 - 如何检索压缩主题上的所有消息
【发布时间】:2018-08-25 09:29:48
【问题描述】:

我正在尝试使用 kafka-node 从 kafka 主题中读取压缩消息。

问题是最近插入的消息留在 EOL 上方,并且在插入其他消息之前无法访问。实际上,在 EOL 和 High Water Offset 之间存在差距,这会阻止阅读最新消息。不清楚这是为什么。

已创建一个主题

kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1

主题中产生了许多关键值。有些键是相同的。

var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
  producer.send(payload, function(error, result) {
  debug('Sent payload to Kafka: ', payload);
  if (error) {
    console.error(error);
  } else {
   res(true)
  }
  client.close()
 });
});

这里是插入的键和值

key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3

然后请求了一组主题键。

var options = {
        id: 'consumer1',
        kafkaHost: "<host:port>",
        groupId: "consumergroup1",
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'earliest'
      };
      var consumerGroup = new ConsumerGroup(options, topic);
        consumerGroup.on('error', onError);
        consumerGroup.on('message', onMessage);
        consumerGroup.on('done', function(message) {
          consumerGroup.close(true,function(){ });
        })
        function onError (error) {
          console.error(error);
        }
        function onMessage (message) {)
            console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
        }
      })
结果令人惊讶:
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=

有一个高水位偏移,代表最新值为 10。 然而,消费者看到的偏移值只有 7。不知何故,压缩会阻止消费者看到最新的消息。

目前尚不清楚如何避免这种约束并让消费者看到最新消息。

任何建议表示赞赏。谢谢。

【问题讨论】:

  • 您的代码与输出不匹配...控制台消费者是否也显示此问题?
  • 我确实遇到了同样的问题,使用消费者而不是消费者组(如果这是你的问题)。 @cricket_007
  • No... 你的代码是 HW=%d ,你的输出是 highWaterOffset。如果您对代码的输出有疑问,那么如果您显示生成它的代码就会很有意义。在任何情况下,主题都是压缩的,因此生产者可以偏移 10,但主题本身只有 7 个值。

标签: javascript apache-kafka


【解决方案1】:

不知何故,压缩会阻止消费者看到最新消息。

是的,您错过了一些消息,但您也看到了其他消息。

压缩正在删除较早的键。

请注意根本没有 url - 1

Key=key2 value={"name":"key2","url":"2"}
Key=key3 value={"name":"key3","url":"2"}
Key=key1 value={"name":"key1","url":"3"}
Key=key value={"name":"key","url":"3"}

那是因为您为同一个键发送了新值。

而你发送了 10 条消息,所以主题的高水位偏移量是 10

您的代码不一定看起来有问题,但您应该还有两个 3 值。打印的偏移量对应于这个逻辑。

key  - 1 | 0
key2 - 1 | 1
key3 - 1 | 2
key  - 2 | 3
key2 - 2 | 4
key3 - 2 | 5
key1 - 3 | 6
key  - 3 | 7
key2 - 3 | 8
key3 - 3 | 9

一般来说,我建议不要让 Kafka 尝试压缩主题并以每秒 10 倍的速度写入日志段,以及使用不同的库,例如 node-rdkafka

【讨论】:

    【解决方案2】:

    在使用 kafka 进行更多工作之后,似乎 kafka-node api 具有以下行为(我认为这实际上源自 kafka 本身)。

    当在 highWaterOff 之前查询消息时,只有达到 highWaterOffset 的消息才会返回到 ConsumerGroup。如果消息没有被复制,这是有道理的,因为组中的另一个消费者不一定会看到这些消息。

    仍然可以使用 Consumer 而不是 ConsumerGroup 并通过查询特定分区来请求和接收超出 highWaterOffset 的消息。

    此外,当偏移量不一定是 latestOffset 时,似乎会触发“完成”事件。在这种情况下,有必要在 message.offset+1 处提交进一步的查询。如果您继续这样做,您可以获得最新偏移量的所有消息。

    我不清楚为什么 kafka 有这种行为,但可能有一些较低级别的细节可以显示这种紧急行为。

    【讨论】:

      猜你喜欢
      • 2019-09-20
      • 1970-01-01
      • 1970-01-01
      • 2020-03-05
      • 2019-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-22
      相关资源
      最近更新 更多