【问题标题】:How can I get the last/end offset of a kafka topic partition?如何获取 kafka 主题分区的最后/结束偏移量?
【发布时间】:2016-11-20 13:23:27
【问题描述】:

我正在使用 Java 编写 kafka 消费者。我想保持消息的实时性,所以如果等待消费的消息太多,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移量开始消费。

针对这个问题,我尝试比较一个topic(只有1个partition)的最后提交的偏移量和结束偏移量,如果这两个偏移量的差值大于一定的量,我会将最后提交的偏移量设置为主题作为下一个偏移量,以便我可以放弃那些多余的消息。

现在我的问题是如何获取一个话题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    新的消费者也很复杂。

    //assign the topic consumer.assign();

    //seek to end of the topic consumer.seekToEnd();

    //the position is the latest offset consumer.position();

    【讨论】:

    • 如果您只想计算客户端当前偏移量和最新已知的 kafka 主题偏移量之间的差异,这不起作用!
    • @hiaclibe 可能使用getWatermarkOffsets 将返回给定主题和分区的最早和最新偏移量:docs.confluent.io/4.1.0/clients/confluent-kafka-dotnet/api/…
    • 这里需要assign()吗?
    【解决方案2】:

    你也可以使用kafka服务器命令行工具:

    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name
    

    输出格式为<topicName>:<partitionID>:<offset>,例如t1:0:0,见 https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-GetOffsetShell.html 了解更多详情。

    【讨论】:

      【解决方案3】:

      对于 Kafka 版本:0.10.1.1

      // Get the diff of current position and latest offset
      Set<TopicPartition> partitions = new HashSet<TopicPartition>();
      TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition());
      partitions.add(actualTopicPartition);
      Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition);
      long actualPosition = consumer.position(actualTopicPartition);          
      System.out.println(String.format("diff: %s   (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));  
      

      【讨论】:

      • lynn提供的方法相比如何?
      • 对其进行了一些测试,对于在 1k 次调用中包含 2.5k 个元素的主题,此解决方案与 lynn 的解决方案之间的改进约为 1.5 ms(分别为 5.1722ms 与 6.7226ms)
      • 从技术上讲,代码将显示下一个要阅读的主题的位置和结束偏移之间的差异。如果您将自动提交设置为 false,那么这不会显示最后一次提交的实际位置,因为最后一次提交可能比下一个要读取的提交更晚。
      【解决方案4】:

      我开发了以下代码来获取偏移状态

      import java.util
      import java.util.{Collections, Properties}
      
      import org.apache.kafka.clients.consumer.KafkaConsumer
      import org.apache.kafka.common.{PartitionInfo, TopicPartition}
      import org.apache.kafka.common.serialization.StringDeserializer
      import scala.collection.JavaConverters._
      
      class GetOffsetRange(consumer:KafkaConsumer[String,String]) {
      
        def getStartOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={
      
          val topicPartitionList = consumer.partitionsFor(topic)
          val partitionMap=new util.HashMap[TopicPartition,Long]()
          val arrTopic=new util.ArrayList[TopicPartition]()
      
          consumer.subscribe(Collections.singletonList(topic));
      
          for(topic<-topicPartitionList.asScala){
            println(topic.topic() +","+topic.partition())
            arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
          }
      
          consumer.poll(0)
      
          consumer.seekToBeginning(arrTopic)
      
          for(partition <- arrTopic.asScala){
            partitionMap.put(partition,consumer.position(partition)-1)
          }
          return partitionMap
        }
      
        def getEndOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={
      
          val topicPartitionList = consumer.partitionsFor(topic)
          val partitionMap=new util.HashMap[TopicPartition,Long]()
          val arrTopic=new util.ArrayList[TopicPartition]()
      
          consumer.subscribe(Collections.singletonList(topic));
      
          for(topic<-topicPartitionList.asScala){
            println(topic.topic() +","+topic.partition())
            arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
          }
      
          consumer.poll(0)
      
          consumer.seekToEnd(arrTopic)
      
          for(partition <- arrTopic.asScala){
            partitionMap.put(partition,consumer.position(partition)-1)
          }
          return partitionMap
        }
      }
      

      【讨论】:

        【解决方案5】:

        从 kafka 1.0.1 开始,consumer 有了一个叫 endOffsets 的方法

        public java.util.Map endOffsets(java.util.Collection 分区)

        如果您需要完整代码,请告诉我..

        请参考 apache-kafka-1.0.1-javadoc

        【讨论】:

          【解决方案6】:
          KafkaConsumer<String, String> consumer = ...
          consumer.subscribe(Collections.singletonList(topic));
          TopicPartition topicPartition = new TopicPartition(topic, partition);
          consumer.poll(0);
          consumer.seekToEnd(Collections.singletonList(topicPartition));
          long currentOffset = consumer.position(topicPartition) -1;
          

          sn-p 以上返回给定主题和分区号的当前提交消息偏移量。

          【讨论】:

          • 需要poll(0) 吗?
          • 因为 consumer.poll(0) 已被弃用。您可以使用 consumer.poll(java.time.Duration.ZERO);
          • @DanielAlder 这种方法有点老套,因为没有原生支持。我会建议在您最后测试它以验证它是否需要。此外,行为可能会随着不同的 Kafka 版本而改变。
          猜你喜欢
          • 1970-01-01
          • 2016-05-27
          • 2019-10-06
          • 2019-08-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-12-12
          • 1970-01-01
          相关资源
          最近更新 更多