【发布时间】:2019-09-12 02:48:49
【问题描述】:
背景
我们公司有由 Zookeeper 管理的 Apache Kafka。我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩(cleanup.policy=compact)。
当前代码
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
...
...
public List<String> getTopics() {
Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
List<String> topicList = new ArrayList<>();
topics.keySet().remove(CONSUMER_OFFSETS);
topicList.addAll(topics.keySet());
return topicList;
}
问题
通过上面的代码,应用程序可以获得主题列表。有没有办法也可以知道各个主题是否被日志压缩?我正在寻找的是某种“Java”方式来获得与从终端运行以下 Apache Kafka CLI 命令时获得的相同响应。
kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact
这是一个例子
Topic:TestTopicCompact PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: TestTopicCompact Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
【问题讨论】:
标签: java apache-kafka