【问题标题】:Logstash kafka input plugin unable to read any messages with new consumer and by setting auto_offset_reset to earliestLogstash kafka 输入插件无法读取新消费者的任何消息,并将 auto_offset_reset 设置为最早
【发布时间】:2016-12-23 00:57:47
【问题描述】:

我正在使用 Logstash Kafka 输入插件从主题中读取消息。我早些时候能够开始新的消费者——属于新的消费者组,并且通过设置 auto_offset_reset=earliest 能够从主题开始消费消息。

插件配置:

input {     
    kafka {         
    bootstrap_servers => "localhost:9092"
        topics => ["test_topic"]
        group_id => "new_consumer"
        client_id => "new_consumer"
        consumer_threads => 1
        auto_offset_reset => "earliest"   
  } 
}

但现在我注意到一个奇怪的行为。即使这是属于新消费者组的新消费者并且 auto_offset_reset 设置为“最早”,我也无法消费任何消息。

启用的调试日志如下是行为: 它清楚地表明消费者没有以前的偏移量,突然获取了分区偏移量,消费者使用它并设置了新的偏移量(请注意:之前从主题中读取了 36387 条消息,因此下面的日志中的数字)

[2016-12-22T16:45:13,454][信息 ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 已成功加入第 1 代组 new_consumer

[2016-12-22T16:45:13,455][信息 ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为组设置新分配的分区 [test_topic-0] 新消费者

[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 组 new_consumer 获取分区的已提交偏移量: [test_topic-0]

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 组 new_consumer 没有为分区 test_topic-0 提交的偏移量

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] 将分区 test_topic-0 的偏移量重置为最早的偏移量。

[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] 在 localhost:9092 启动到节点 0 的连接。

[2016-12-22T16:45:13,657][调试][logstash.instrument.collector] 收集器:向观察者发送快照 {:created_at=>2016-12-22 16:45:13 -0800}

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.bytes-sent 的传感器

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.bytes-received 的传感器

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.latency 的传感器

[2016-12-22T16:45:13,742][调试][org.apache.kafka.clients.NetworkClient] 已完成与节点 0 的连接

[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] 获取分区 test_topic-0 的偏移量 36387

[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为分区 test_topic-0 分组 newconsumer 提交的偏移量 36387

[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为分区 test_topic-0 分组 newconsumer 提交的偏移量 36387

谁能告诉我为什么我们会看到这种行为?

【问题讨论】:

  • 您使用的是哪个 Logstash 和 Kafka 版本?
  • 很抱歉没有早点添加。 Logstash 5.0.2 和 Kafka 2.0.10
  • 2.10 是 Scala 版本。目前Kafka是0.10.1.0版本,是你用的那个吗?
  • 是的,你是对的。 Scala 2.10 和 kafka v0.10.0.1

标签: java apache-kafka logstash kafka-producer-api


【解决方案1】:

是否可以根据配置的保留期限删除旧邮件?可能偏移量 36387 是最早的偏移量,并且所有较早的消息都已过期。默认保留期为 7 天。

【讨论】:

  • 感谢您的回答。让我检查一下消息的保留期限。
  • 谢谢@hans。这确实是问题
猜你喜欢
  • 2021-04-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-11-09
  • 2023-03-22
  • 2018-04-25
  • 2020-05-21
  • 1970-01-01
相关资源
最近更新 更多