【发布时间】:2017-02-03 07:08:37
【问题描述】:
正在完成确定 ConsumerLag 的任务,需要按如下方式检索当前的 Producer Offset:
PartitionOffsetRequestInfo partitionOffsetRequestInfo =
new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 100);
List<TopicAndPartition> partitions = new ArrayList<>();
for(int i = 0; i < partitionMetadataList.size(); i++) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);
partitions.add(topicAndPartition);
tuple2List.add(new Tuple2<>(topicAndPartition, partitionOffsetRequestInfo));
}
Tuple2<TopicAndPartition, PartitionOffsetRequestInfo>[] tuple2Array =
tuple2List.parallelStream().toArray(Tuple2[]::new);
WrappedArray<Tuple2<TopicAndPartition, PartitionOffsetRequestInfo>> wrappedArray =
Predef.wrapRefArray(tuple2Array);
scala.collection.immutable.Map<TopicAndPartition, PartitionOffsetRequestInfo> offsetRequestInfoMap =
(scala.collection.immutable.Map<TopicAndPartition, PartitionOffsetRequestInfo>)
scala.Predef$.MODULE$.Map().apply(wrappedArray);
OffsetRequest offsetRequest = new OffsetRequest(offsetRequestInfoMap, (short)0,
0, OffsetRequest.DefaultClientId(), Request.OrdinaryConsumerId());
查看 OffsetResponse 时会看到一组 UnknownTopicOrPartitionException offsets。如果我为 versionId 传递 (short)1(就像我对 OffsetFetchResponse 的调用一样),当我尝试检索结果时,我会得到一个 NetworkReceive.readFromReadableChannel 异常。
问题:
一个。有没有更好的方法来获取当前的生产者偏移量?
湾。为什么 OffsetRequest 调用不适用于 VersionId = 1?
编辑:
请注意,我可以使用这个 channel 来检索 ConsumerOffset,所以我知道它有效。
我可以使用 cmdline 检索值:
kafka-consumer-groups --bootstrap-server 主机名:9092 --describe --new-consumer --group test_consumer
编辑:
尝试重用示例 scala(重写为 Java)代码:
KafkaConsumer<String, String> kafkaConsumer = getConsumer();
List<org.apache.kafka.common.TopicPartition>topicAndPartitions = new ArrayList<>();
org.apache.kafka.common.TopicPartition topicAndPartition = new org.apache.kafka.common.TopicPartition("my_topic", 0);
topicAndPartitions.add(topicAndPartition);
kafkaConsumer.assign(topicAndPartitions);
kafkaConsumer.seekToEnd(topicAndPartitions);
long lPos = kafkaConsumer.position(topicAndPartition);
在调用.position() 时遇到相同的异常 (NetworkReceive.readFromReadableChannel)。
【问题讨论】:
-
那你最后的问题是什么?
-
我做错了什么?有一个解决方案 - 我只是还没有找到它。
标签: java scala apache-kafka