【问题标题】:Apache Kafka Client (Java) : List topics and check whether topic is log compactedApache Kafka Client (Java) : 列出主题并检查主题是否被日志压缩
【发布时间】:2019-09-12 02:48:49
【问题描述】:

背景

我们公司有由 Zookeeper 管理的 Apache Kafka。我们的 Spring Boot 应用程序之一需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩(cleanup.policy=compact)。

当前代码


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

...
...

    public List<String> getTopics() {
        Map<String, List<PartitionInfo>> topics = consumerFactory().createConsumer().listTopics();
        List<String> topicList = new ArrayList<>();
        topics.keySet().remove(CONSUMER_OFFSETS);
        topicList.addAll(topics.keySet());
        return topicList;
    }

问题

通过上面的代码,应用程序可以获得主题列表。有没有办法也可以知道各个主题是否被日志压缩?我正在寻找的是某种“Java”方式来获得与从终端运行以下 Apache Kafka CLI 命令时获得的相同响应。

kafka-topics --zookeeper localhost:2181 --describe --topic TestTopicCompact

这是一个例子

Topic:TestTopicCompact  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: TestTopicCompact Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    您应该使用AdminClient API 来检索该信息。

    • 首先使用listTopics()检索主题列表。
    • 然后使用describeConfigs() 获取每个主题的配置。
    • 最后,在您将获得的 ConfigEntry 对象中,您可以过滤将 compact 作为 cleanup.policy 的主题。

    这基本上就是kafka-topics 工具正在做的事情,所以你可以看看它的源代码kafka.admin.TopicCommand。虽然是 Scala,但概念是相似的。

    【讨论】:

    • 嗯。在我看来(来自 CLI 命令)AdminClient 需要与 Zookeeper 实例挂钩才能获取主题描述。 (当我尝试使用代理列表执行此操作时,我得到以下执行“org.apache.kafka.common.errors.UnsupportedVersionException:代理不支持 DESCRIBE_CONFIGS”)。问题是我在创建 AdminClient 时找不到指定 zookeeper 列表的方法,即没有像 zookeeper.servers 这样的配置属性可以提及,就像 borker list (bootstrap.servers) 一样。
    • 事实上,当我执行“kafka-broker-api-versions --bootstrap-server ”时,我得到以下“DescribeConfigs(32): UNSUPPORTED”。顺便说一下,我使用的客户端版本是 kafka-clients:2.0.1,Kafka 集群版本是 0.10
    • DescribeConfigs 支持已在 0.11 中添加。在 0.11 之前,主题配置仅在 Zookeeper 中可用。我真的建议升级到更新的 Kafka 版本,3 年前发布的 0.10。
    【解决方案2】:

    您首先需要使用属性文件创建一个 kafka 管理客户端。

    import org.apache.kafka.clients.admin.AdminClient;
    import java.util.*;
    import org.apache.kafka.clients.admin.Config;
    import org.apache.kafka.clients.admin.ConfigEntry;
    import org.apache.kafka.clients.admin.DescribeConfigsResult;
    import org.apache.kafka.common.config.ConfigResource;
    
    admin = AdminClient.create(properties);
    

    然后你需要使用 admin.describeConfigs 方法, 此方法将 ConfigResources 集合作为参数,定义如下。

    Collection<ConfigResource> cr =  Collections.singleton( new ConfigResource(ConfigResource.Type.TOPIC, "<Your Topic Name>") 
    
    DescribeConfigsResult ConfigsResult = admin.describeConfigs(cr));
    

    使用 ConfigsResult 然后您需要评估和解析此数据类型,这里我将其强制转换为配置数据类型。

    Config all_configs = (Config)ConfigsResult.all().get().values().toArray()[0];
    

    然后您需要遍历包含许多 ConfigResource 对象的 all_configs 数据类型。 下面的代码构建了一个迭代器循环并找到用于清理策略的配置的配置值,您可以使用相同的逻辑来查找其他字段的配置值。

        Iterator ConfigIterator = all_configs.entries().iterator();
    
        while (ConfigIterator.hasNext()) 
        {
              ConfigEntry currentConfig = (ConfigEntry) ConfigIterator.next();
              if (currentConfig.name().equals("cleanup.policy")) {
                    System.out.println(currentConfig.value());
              }
            }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-01-05
      • 2019-05-14
      • 1970-01-01
      • 1970-01-01
      • 2023-01-18
      • 2017-10-02
      • 1970-01-01
      相关资源
      最近更新 更多