【问题标题】:kafka.api.OffsetRequest - unable to retrieve resultskafka.api.OffsetRequest - 无法检索结果
【发布时间】:2017-02-03 07:08:37
【问题描述】:

正在完成确定 ConsumerLag 的任务,需要按如下方式检索当前的 Producer Offset:

PartitionOffsetRequestInfo partitionOffsetRequestInfo = 
    new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 100);

List<TopicAndPartition> partitions = new ArrayList<>();
for(int i = 0; i < partitionMetadataList.size(); i++) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);
    partitions.add(topicAndPartition);

    tuple2List.add(new Tuple2<>(topicAndPartition, partitionOffsetRequestInfo));
}

Tuple2<TopicAndPartition, PartitionOffsetRequestInfo>[] tuple2Array =
    tuple2List.parallelStream().toArray(Tuple2[]::new);

WrappedArray<Tuple2<TopicAndPartition, PartitionOffsetRequestInfo>> wrappedArray =
    Predef.wrapRefArray(tuple2Array);

scala.collection.immutable.Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfoMap =
    (scala.collection.immutable.Map<TopicAndPartition, PartitionOffsetRequestInfo>)
    scala.Predef$.MODULE$.Map().apply(wrappedArray);
    
OffsetRequest offsetRequest = new OffsetRequest(offsetRequestInfoMap, (short)0,
    0, OffsetRequest.DefaultClientId(), Request.OrdinaryConsumerId());

查看 OffsetResponse 时会看到一组 UnknownTopicOrPartitionException offsets。如果我为 versionId 传递 (short)1(就像我对 OffsetFetchResponse 的调用一样),当我尝试检索结果时,我会得到一个 NetworkReceive.readFromReadableChannel 异常。

问题:

一个。有没有更好的方法来获取当前的生产者偏移量?
湾。为什么 OffsetRequest 调用不适用于 VersionId = 1?


编辑:

请注意,我可以使用这个 channel 来检索 ConsumerOffset,所以我知道它有效。

我可以使用 cmdline 检索值:

kafka-consumer-groups --bootstrap-server 主机名:9092 --describe --new-consumer --group test_consumer


编辑:

尝试重用示例 scala(重写为 Java)代码:

KafkaConsumer<String, String> kafkaConsumer = getConsumer();
List<org.apache.kafka.common.TopicPartition>topicAndPartitions = new ArrayList<>();

org.apache.kafka.common.TopicPartition topicAndPartition = new org.apache.kafka.common.TopicPartition("my_topic", 0);
topicAndPartitions.add(topicAndPartition);

kafkaConsumer.assign(topicAndPartitions);

kafkaConsumer.seekToEnd(topicAndPartitions);
long lPos = kafkaConsumer.position(topicAndPartition);

在调用.position() 时遇到相同的异常 (NetworkReceive.readFromReadableChannel)。

【问题讨论】:

  • 那你最后的问题是什么?
  • 我做错了什么?有一个解决方案 - 我只是还没有找到它。

标签: java scala apache-kafka


【解决方案1】:

根据源代码,OffsetRequest 的当前版本是 0 而不是 1。此外,源代码不会返回包含时间戳信息的 Version-1 响应,如 doc 所说。所以这可能是一个文档错误。

【讨论】:

    【解决方案2】:

    如果您已经到了这一点,这里有一个可行的解决方案:

    private void getOffsets(String topic, String group) {
        KafkaConsumer<String, String> kafkaConsumer = getConsumer(topic, group);
        List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
    
        List<org.apache.kafka.common.TopicPartition>topicAndPartitions = new ArrayList<>();
    
        for(int i = 0; i < partitionInfos.size(); i++) {
            org.apache.kafka.common.TopicPartition topicAndPartition = new org.apache.kafka.common.TopicPartition(topic, i);
            topicAndPartitions.add(topicAndPartition);
        }
    
        List<Long>startList = new ArrayList<>();
        List<Long>endList = new ArrayList<>();
    
        kafkaConsumer.assign(topicAndPartitions);
    
        for(int i = 0; i < partitionInfos.size(); i++) {
            OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(topicAndPartitions.get(i));
            if(offsetAndMetadata != null) {
                startList.add(offsetAndMetadata.offset());
            }
        }
    
        // did we find any active partitions?
        if(startList.size() == 0) {
            LOGGER.info("topic:group not found: {}:{}", topic, group);
            return;
        }
    
        kafkaConsumer.seekToEnd(topicAndPartitions);
    
        for(int i = 0; i < partitionInfos.size(); i++) {
            endList.add(i, kafkaConsumer.position(topicAndPartitions.get(i)));
        }
    
        LOGGER.debug("startlist.size: {}  endlist.size: {}  partitions: {}", startList.size(), endList.size(), partitionInfos.size());
    
        long sumLag = 0;
        for(int i = 0; i < partitionInfos.size(); i++) {
            long lStart = startList.get(i);
            long lEnd = endList.get(i);
    
            sumLag += (lEnd - lStart);
    
    /*
     *  At this point Im sending the info to data dog. 
     *  The 'sum' value is nice to have.
     */
            LOGGER.debug("partition: {}  start: {}   end: {}  lag: {}", i, lStart, lEnd, (lEnd - lStart));
        }
    
        kafkaConsumer.poll(100);
    
        topicAndPartitions.clear();
        kafkaConsumer.assign(topicAndPartitions)
    
    }
    

    【讨论】:

      猜你喜欢
      • 2014-08-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-19
      • 2012-12-20
      • 2019-05-15
      • 1970-01-01
      相关资源
      最近更新 更多