【问题标题】:Fetching and Updating Kafka Topic Configurations through Java API通过 Java API 获取和更新 Kafka 主题配置
【发布时间】:2017-04-04 11:28:03
【问题描述】:

我正在编写一个应用程序来通过 Java API 对一个 kafka 主题执行一系列操作。我能够创建主题并添加分区。我在获取主题元数据(例如分区、代理)和配置以及更新配置方面需要帮助。

作为参考,我想更新此处提供的主题级别配置 -https://kafka.apache.org/documentation#configuration,例如 cleanup.policy、compression.type 等

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    将我的回答链接到类似问题,这是如何在 Java 中获取主题的配置。 https://stackoverflow.com/questions/57272855/apache-kafka-client-java-list-topics-and-check-whether-topic-is-log-compacte/57895165#57895165[1]

    要点是您需要创建一个AdminClient,然后是ConfigResource 属性的集合,调用describeConfigs 方法并解析并循环遍历DescribeConfigsResult 以找到您要查找的特定配置。

    这是代码,更多解释在我上面链接的答案中。

    import org.apache.kafka.clients.admin.AdminClient;
    import java.util.*;
    import org.apache.kafka.clients.admin.Config;
    import org.apache.kafka.clients.admin.ConfigEntry;
    import org.apache.kafka.clients.admin.DescribeConfigsResult;
    import org.apache.kafka.common.config.ConfigResource;
    
    admin = AdminClient.create(properties);
    
    Collection<ConfigResource> cr = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC,"sepoc.ccc4s-check-request-v1"));
    DescribeConfigsResult ConfigsResult = client.describeConfigs(cr);
    Config all_configs = (Config) ConfigsResult.all().get().values().toArray()[0];
    
    for (ConfigEntry currentConfig : all_configs.entries()) {
      if (currentConfig.name().equals("compression.type")) {
        log.info("compression.type: {}", currentConfig.value());
      }
    }
    

    【讨论】:

      【解决方案2】:

      您可以使用下面的代码来打印主题级别的配置。更新配置的用法类似。

      String[] args = {"--zookeeper", "localhost:2181", "--entity-type", "topics", "--entity-name", "test", "--describe"};

      ConfigCommand.main(args);

      关于获取元数据,请参考https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example中的Finding the Lead Broker for a Topic and Partition

      添加:使用 AdminUtils 添加配置获取和更新示例:

      ZkUtils zkUtils = ZkUtils.apply("localhost:2181/k1", 6000, 10000, JaasUtils.isZkSecurityEnabled());
      
          Properties pp = new Properties();
          pp.setProperty("delete.retention.ms", "3000000");
          pp.setProperty("file.delete.delay.ms", "40000");
          AdminUtils.changeTopicConfig(zkUtils, "test", pp);
          Properties p = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");
          System.out.println(p);
      

      【讨论】:

      • 我正在尝试通过 Kafka Admin Utils API 实现结果
      猜你喜欢
      • 2016-05-03
      • 1970-01-01
      • 2016-03-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-18
      • 1970-01-01
      • 2018-05-18
      相关资源
      最近更新 更多