【问题标题】:How to get topic list from kafka server in Java如何从 Java 中的 kafka 服务器获取主题列表
【发布时间】:2014-09-14 03:42:23
【问题描述】:

我使用的是kafka 0.8 版本,而且非常新。

我想知道kafka server 中创建的主题列表及其 元数据。 是否有任何 API 可用于找出这一点?

基本上,我需要编写一个Java使用者,它应该自动发现kafka server中的任何主题。有API可以获取TopicMetadata,但这需要主题名称作为输入 参数。我需要服务器中所有主题的信息。

【问题讨论】:

  • 如果你正在寻找java api,不幸的是目前除了你提到的那个之外没有其他的

标签: java apache-kafka


【解决方案1】:

使用 Kafka 0.9.0

您可以使用提供的消费者方法 listTopics() 列出服务器中的主题;

例如。

Map<String, List<PartitionInfo> > topics;

Properties props = new Properties();
props.put("bootstrap.servers", "1.2.3.4:9092");
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
topics = consumer.listTopics();
consumer.close();

【讨论】:

    【解决方案2】:

    我认为这是最好的方法:

    ZkClient zkClient = new ZkClient("zkHost:zkPort");
    List<String> topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient));
    

    【讨论】:

    • 经过简单一点的0.9.x版
    • 谢谢,这正是我想要的!
    【解决方案3】:

    Kafka 附带的示例 shell 脚本是一个很好的起点。 在发行版的 /bin 目录中有一些你可以使用的 shell 脚本,其中之一是 ./kafka-topic-list.sh 如果您在不指定主题的情况下运行它,它将返回所有主题及其元数据。 看: https://github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

    该 shell 脚本依次运行: https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

    以上都是对0.8 Kafka版本的引用,所以如果你使用的是不同的版本(甚至是一点不同),一定要在github上使用相应的分支/标签

    【讨论】:

    • 这些可以从windows上运行的java应用远程执行吗?
    【解决方案4】:

    使用 Scala:

    import java.util.{Properties}
    import org.apache.kafka.clients.consumer.KafkaConsumer
    
    object KafkaTest {
      def main(args: Array[String]): Unit = {
    
        val brokers = args(0)
        val props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        val consumer = new KafkaConsumer[String, String](props);
        val topics = consumer.listTopics().keySet();
    
        println(topics)
      }
    }
    

    【讨论】:

      【解决方案5】:

      如果你想从 Zookeeper 中提取 broker 或 other-kafka 信息,那么kafka.utils.ZkUtils 提供了一个很好的接口。这是我必须列出所有 Zookeeper 代理的代码(那里还有很多其他方法):

      List<Broker> listBrokers() {
      
              final ZkConnection zkConnection = new ZkConnection(connectionString);
              final int sessionTimeoutMs = 10 * 1000;
              final int connectionTimeoutMs = 20 * 1000;
              final ZkClient zkClient = new ZkClient(connectionString,
                                                     sessionTimeoutMs,
                                                     connectionTimeoutMs,
                                                     ZKStringSerializer$.MODULE$);
      
              final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
      
              scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
      }
      

      【讨论】:

      • OP 要求提供主题列表而不是经纪人
      【解决方案6】:

      您可以使用 zookeeper API 获取如下所述的 broker 列表:

          ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null);
          List<String> ids = zk.getChildren("/brokers/ids", false);
          List<Map> brokerList = new ArrayList<>();
          ObjectMapper objectMapper = new ObjectMapper();
      
          for (String id : ids) {
              Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class);
              brokerList.add(map);
          }
      

      使用此代理列表通过以下链接获取所有主题

      https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader

      【讨论】:

      • OP 询问主题列表而不是经纪人
      • 你得到broker列表,然后得到topic,上面的方法有什么问题。
      • 只是 sn-p 没有提供答案,并且链接的功能比所要求的要多。最好将解决方案嵌入您的答案中,而不是链接。
      猜你喜欢
      • 1970-01-01
      • 2023-03-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-29
      • 2015-08-01
      • 1970-01-01
      • 2019-02-10
      相关资源
      最近更新 更多