【问题标题】:How to read data using Kafka Consumer API from beginning?如何从一开始就使用 Kafka Consumer API 读取数据?
【发布时间】:2015-04-18 03:04:18
【问题描述】:

请谁能告诉我每次运行消费者时如何从一开始就使用 Kafka Consumer API 读取消息。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    在创建ConsumerConfig 时使用高级消费者集props.put("auto.offset.reset", "smallest");

    【讨论】:

    • 这只会确保您在第一次阅读时会从头开始阅读。后续读取将完全忽略此设置并从最后一个偏移量读取。
    • 应该是 props.put("auto.offset.reset", "earliest");
    【解决方案2】:

    1) https://stackoverflow.com/a/17084401/3821653

    2) http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3CCAOG_4QYz2ynH45a8kXb8qw7xw4vDRRwNqMn5j9ERFxJ8RfKGCg@mail.gmail.com%3E

    要重置消费者组,可以删除Zookeeper组id

     import kafka.utils.ZkUtils;
     ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
    

    【讨论】:

      【解决方案3】:

      这适用于 0.9.x 消费者。基本上,当您创建一个消费者时,您需要使用属性ConsumerConfig.GROUP_ID_CONFIG 为该消费者分配一个消费者组ID。每次启动消费者执行类似properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); 的操作时随机生成消费者组 ID(属性是您将传递给构造函数 new KafkaConsumer(properties) 的 java.util.Properties 的一个实例)。

      随机生成客户端意味着新的消费者组在 kafka 中没有与之关联的任何偏移量。所以我们在这之后要做的就是为这个场景制定一个策略。正如auto.offset.reset 属性的文档所说:

      当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,因为该数据已被删除):

      • 最早:自动将偏移量重置为最早的偏移量
      • latest:自动将偏移量重置为最新的偏移量
      • none:如果没有找到先前的偏移量或消费者所在的组,则向消费者抛出异常
      • 其他任何事情:向消费者抛出异常。

      所以从上面列出的选项中,我们需要选择earliest 策略,这样新的消费者组每次都从头开始。

      您在 java 中的代码将如下所示:

      properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
      properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
      properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      consumer = new KafkaConsumer(properties);
      

      您现在唯一需要弄清楚的是,当有多个消费者属于同一个消费者组但分布时,如何生成随机 id 并将其分布在这些实例之间,以便它们都属于同一个消费者团体。

      希望对你有帮助!

      【讨论】:

      • 这不是一个好的解决方案。这样做会导致 Zookeeper 数据堆积起来,不断地创建和放弃新的条目。最好按照下面 KingJulien 的指示删除您的组的条目以及他发布的链接答案。
      • 你需要关闭消费者。消费者.close();
      【解决方案4】:

      如果您使用的是 java consumer api,更具体地说是 org.apache.kafka.clients.consumer.Consumer,您可以尝试 seek* 方法。

      consumer.seekToBeginning(consumer.assignment())
      

      这里,consumer.assignment() 返回分配给给定消费者的所有分区,并且 seekToBeginning 将从给定分区集合的最早偏移量开始。

      【讨论】:

      • 这对我有用,但我需要轮询直到真正有分配:while(consumer.assignment().isEmpty()) { consumer.poll(Duration.ofMillis(0)); }。循环结束后,我正常轮询并从头开始接收所有记录
      • 感谢您的上述评论(关于等待分配) - 现在可以完美运行了。
      【解决方案5】:

      执行此操作的一个选项是每次启动时都有一个唯一的组 ID,这意味着 Kafka 会从一开始就向您发送主题中的消息。当您为KafkaConsumer 设置属性时,请执行以下操作:

      properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
      

      另一种选择是使用consumer.seekToBeginning(consumer.assignment()),但这将不起作用,除非 Kafka 首先通过让消费者调用 poll 方法从消费者那里获得心跳。所以调用poll(),然后调用seekToBeginning(),如果你想要从头开始的所有记录,再调用poll()。这有点老套,但从 0.9 版本开始,这似乎是最可靠的方法。

      // At this point, there is no heartbeat from consumer so seekToBeinning() wont work
      // So call poll()
      consumer.poll(0);
      // Now there is heartbeat and consumer is "alive"
      consumer.seekToBeginning(consumer.assignment());
      // Now consume
      ConsumerRecords<String, String> records = consumer.poll(0);
      

      【讨论】:

      • 这个解决方案是不是创建了很多不必要的消费者元数据!
      【解决方案6】:

      一种可能的解决方案是在订阅一个或多个主题时使用 ConsumerRebalanceListener 的实现。 ConsumerRebalanceListener 包含在消费者分配或删除新分区时的回调方法。以下代码示例说明了这一点:

      public class SkillsConsumer {
      
      private String topic;
      
      private KafkaConsumer<String, String> consumer;
      
      private static final int POLL_TIMEOUT = 5000;
      
      public SkillsConsumer(String topic) {
          this.topic = topic;
          Properties properties = ConsumerUtil.getConsumerProperties();
          properties.put("group.id", "consumer-skills");
          this.consumer = new KafkaConsumer<>(properties);
          this.consumer.subscribe(Collections.singletonList(this.topic),
                  new PartitionOffsetAssignerListener(this.consumer));
          }
      }
      
      public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {
      
      private KafkaConsumer consumer;
      
      public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
          this.consumer = kafkaConsumer;
      }
      
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      
      }
      
      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          //reading all partitions from the beginning
          for(TopicPartition partition : partitions)
              consumer.seekToBeginning(partition);
      }
      

      }

      现在,只要将分区分配给消费者,就会从头开始读取每个分区。

      【讨论】:

      • 这是最干净的解决方案恕我直言。作为一个微小的改进:onPartitionsAssigned 方法中的for-loop 可以替换为consumer.seekToBeginning(partitions)(在我的测试中适用于 kafka-2.1.0)。
      【解决方案7】:

      所以对我来说,有效的是上面建议的组合。关键的变化是包括

      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      

      并且每次都有一个随机生成的 GROUP ID。但仅此一项对我不起作用。出于某种原因,我第一次对消费者进行调查时,它从未得到任何记录。我必须破解它才能让它工作 -

      consumer.poll(0); // without this the below statement never got any records
      final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
      

      我是 KAFKA 的新手,不知道为什么会发生这种情况,但对于仍在尝试使其正常工作的其他人,希望这会有所帮助。

      【讨论】:

      • 每次我想从主题中获取所有数据时,我都不得不采用这种方法来获取所有数据。
      【解决方案8】:
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      

      如果您只是避免保存任何偏移量,消费者将始终在开始时重置。

      【讨论】:

      • 正如反复记录的那样,这还不够:仅在未找到提交时才使用偏移重置提交,但是由于将自动提交设置为 false 意味着您正在使用提交 API,因此提交仍然会发生消息被阅读。 docs.confluent.io/platform/current/clients/consumer.html
      【解决方案9】:

      另一种选择是让您的消费者代码保持简单,并使用 Kafka 附带的命令行工具 kafka-consumer-groups 从外部引导偏移管理。

      每次在启动消费者之前,你都会调用

      bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
       --execute --reset-offsets \
       --group myConsumerGroup \
       --topic myTopic \
       --to-earliest
      

      根据您的要求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或documentation解释选项:

      --reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
      
      --to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
      --to-earliest : Reset offsets to earliest offset.
      --to-latest : Reset offsets to latest offset.
      --shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
      --from-file : Reset offsets to values defined in CSV file.
      --to-current : Resets offsets to current offset.
      --by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
      --to-offset : Reset offsets to a specific offset.
      

      【讨论】:

        【解决方案10】:

        始终从偏移量 0 读取,而不是每次都创建新的 groupId。

            // ... Assuming the props have been set properly.
            // ... enable.auto.commit and auto.offset.reset as default
        
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(topic));
            consumer.poll(0);  // without this, the assignment will be empty. 
            consumer.assignment().forEach(t -> {
                System.out.printf("Set %s to offset 0%n", t.toString());
                consumer.seek(t, 0);
            });
            while (true) {
             // ... consumer polls messages as usual.
            }
        

        【讨论】:

          【解决方案11】:

          这是我从头开始读取消息的代码(使用 Java 11)

          try (var consumer = new KafkaConsumer<String, String>(config)) {
               
                  consumer.subscribe(Set.of(topic), new ConsumerRebalanceListener() {
                      @Override
                      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                      }
          
                      @Override
                      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                          consumer.seekToBeginning(partitions);
                      }
                  });
                  // polling messages
          }
          

          您可以在此处查看完整的代码示例:

          https://gist.github.com/vndung/4c9527b3aeafec5d3245c7a3b921f8b1

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2022-10-04
            • 1970-01-01
            • 2015-08-31
            • 1970-01-01
            • 2022-10-14
            • 2020-05-07
            • 2014-08-30
            相关资源
            最近更新 更多