【问题标题】:Logstash 5.1.1 kafka input doesn't pick up existing messages on topicLogstash 5.1.1 kafka 输入不会获取有关主题的现有消息
【发布时间】:2017-11-17 20:42:12
【问题描述】:

我有以下带有 kafka 输入的 logstash 配置

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["mytopic"]
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
    doc_as_upsert => true
    action => "update"
  }
}

我面临的问题是,当我运行 logstash 时,它不会拾取有关该主题的旧消息。我的印象是,第一次运行 logstash 时,它会拾取有关主题的所有尚未使用的消息。我检查了这是一个新主题,并且其中有一些消息,当它开始运行时,logstash 没有接收到这些消息。它确实会在运行时接收有关该主题的消息,但不会接收在其开始之前存在的消息。我是在配置中遗漏了什么,还是输入本身的怪癖。消息的保证对我的业务需求至关重要。

【问题讨论】:

    标签: elasticsearch apache-kafka logstash apache-zookeeper


    【解决方案1】:

    由于您没有为 kafka 指定组 id,因此重要的注意事项如下:

    • Kafka group.id(logstash kafka 配置中的 group_id)设置为 logstash 的默认值,即“logstash”
    • logstash 中 enable.auto.commit (enable_auto_commit) 的默认 Kafka 值为“true”
    • Kafka auto.offset.reset (auto_offset_reset) 在 logstash 中没有默认值,所以我假设使用的是 Kafka 默认值 latest。

    因此,当您在某个主题上运行消费者并且它无法获取该主题中已有的消息时,可能会发生以下两种情况之一:

    1. 不存在与消费者具有相同组 ID 的现有组,因此使用 Kafka 默认 auto.offset.reset 值 latest 并且消费者将忽略已经存在的消息。
    2. 存在具有相同组 ID(“logstash”)的现有组,并且具有该组 ID 的某些使用者已经使用了现有消息并提交了偏移量(该其他使用者可能是您之前运行的或某些其他具有相同组 id 的消费者)。这意味着该组下的其他消费者不会重新使用这些消息,除非以某种方式明确告知这样做。

    所以你可能想要做的是设置一些 Kafka 配置,对于 logstash 你应该能够设置

    group_id => "some_random_group"

    auto_offset_reset => "最早的"

    如果你现在运行消费者,由于 some_random_group 没有现有的偏移量并且重置是最早的,消费者应该消费一个主题中的所有现有消息并提交偏移量。这意味着如果在消费完所有消息后再次运行消费者,它不会消费现有的消息。

    【讨论】:

    • 谢谢。这很有意义。我们确实有多个消费者在机器上运行,但主题不同。并且没有指定 group_id(默认为 logstash)。那会起到一定的作用。在生产中,我只打算拥有一个消费者,但在开发中,任何人都可以创建消费者,所以我想知道是否所有这些消费者都需要拥有唯一的组 ID。
    • 如果您为多个消费者指定了相同的组 id,即他们组成了该消费者组,他们将获得大致相同数量的消息。因此,如果您在生产过程中运行具有相同组 id 的消费者,则会将消息发送给他们。在开发中,如果每个人不希望自己的测试干扰其他人的测试,我建议每个人使用不同的组 ID,反之亦然。
    【解决方案2】:

    您应该将 kafka 输入插件设置 auto_offset_reset 设置为“最早”。

    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        auto_offset_reset => "earliest"
        topics => ["mytopic"]
      }
    }
    

    【讨论】:

    • auto_offset_reset 值仅在 Kafka 无法为消费者找到现有组时使用。如果这对您不起作用,则意味着存在一个现有组,并且该组中的某些使用者已经消费了消息并提交了偏移量。看看我的回答。
    猜你喜欢
    • 2023-03-22
    • 1970-01-01
    • 1970-01-01
    • 2016-01-07
    • 2015-08-01
    • 1970-01-01
    • 2023-03-28
    • 2020-12-12
    • 2021-10-26
    相关资源
    最近更新 更多