【发布时间】:2016-12-23 00:57:47
【问题描述】:
我正在使用 Logstash Kafka 输入插件从主题中读取消息。我早些时候能够开始新的消费者——属于新的消费者组,并且通过设置 auto_offset_reset=earliest 能够从主题开始消费消息。
插件配置:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test_topic"]
group_id => "new_consumer"
client_id => "new_consumer"
consumer_threads => 1
auto_offset_reset => "earliest"
}
}
但现在我注意到一个奇怪的行为。即使这是属于新消费者组的新消费者并且 auto_offset_reset 设置为“最早”,我也无法消费任何消息。
启用的调试日志如下是行为: 它清楚地表明消费者没有以前的偏移量,突然获取了分区偏移量,消费者使用它并设置了新的偏移量(请注意:之前从主题中读取了 36387 条消息,因此下面的日志中的数字)
[2016-12-22T16:45:13,454][信息 ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 已成功加入第 1 代组 new_consumer
[2016-12-22T16:45:13,455][信息 ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为组设置新分配的分区 [test_topic-0] 新消费者
[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 组 new_consumer 获取分区的已提交偏移量: [test_topic-0]
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 组 new_consumer 没有为分区 test_topic-0 提交的偏移量
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] 将分区 test_topic-0 的偏移量重置为最早的偏移量。
[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] 在 localhost:9092 启动到节点 0 的连接。
[2016-12-22T16:45:13,657][调试][logstash.instrument.collector] 收集器:向观察者发送快照 {:created_at=>2016-12-22 16:45:13 -0800}
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.bytes-sent 的传感器
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.bytes-received 的传感器
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] 添加了名为 node-0.latency 的传感器
[2016-12-22T16:45:13,742][调试][org.apache.kafka.clients.NetworkClient] 已完成与节点 0 的连接
[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] 获取分区 test_topic-0 的偏移量 36387
[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为分区 test_topic-0 分组 newconsumer 提交的偏移量 36387
[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 为分区 test_topic-0 分组 newconsumer 提交的偏移量 36387
谁能告诉我为什么我们会看到这种行为?
【问题讨论】:
-
您使用的是哪个 Logstash 和 Kafka 版本?
-
很抱歉没有早点添加。 Logstash 5.0.2 和 Kafka 2.0.10
-
2.10 是 Scala 版本。目前Kafka是0.10.1.0版本,是你用的那个吗?
-
是的,你是对的。 Scala 2.10 和 kafka v0.10.0.1
标签: java apache-kafka logstash kafka-producer-api