【问题标题】:Kafka Topic creation from Java API [duplicate]从Java API创建Kafka主题[重复]
【发布时间】:2018-03-31 03:38:42
【问题描述】:

我正在尝试使用 Java API 创建一个 Kafka 主题,但无法获得 LEADER。

代码:

int partition = 0;
        ZkClient zkClient = null;
        try {
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }

错误:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka 0.11.0.1 是否支持 AdminUtils.???请告诉我如何在这个版本中创建主题。

提前致谢。

【问题讨论】:

标签: java apache-kafka kafka-producer-api


【解决方案1】:

自 Kafka 0.11 以来,有一个适当的管理 API 用于创建(和删除)主题,我建议使用它而不是直接连接到 Zookeeper。

参见 AdminClient.createTopics():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)

【讨论】:

    【解决方案2】:

    通常LEADER NOT AVAILABLE 指向网络问题,而不是您的代码问题。 试试:

    telnet host port 看看您是否可以从您的机器连接到所有必需的主机/端口。

    但是,最新的方法是在创建主题时使用BOOTSTRAP_SERVERS

    使用 Scala 的主题创建代码的工作版本如下:

    使用 sbt 导入所需的kafka-clients

    // https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
    libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1")
    

    scala中创建主题的代码:

    import java.util.Arrays
    import java.util.Properties
    
    import org.apache.kafka.clients.admin.NewTopic
    import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
    
    class CreateKafkaTopic {
      def create(): Unit = {
        val config = new Properties()
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")
    
        val localKafkaAdmin = AdminClient.create(config)
    
        val partitions = 3
        val replication = 1.toShort
        val topic = new NewTopic("integration-02", partitions, replication)
        val topics = Arrays.asList(topic)
    
        val topicStatus = localKafkaAdmin.createTopics(topics).values()
        //topicStatus.values()
        println(topicStatus.keySet())
      }
    
    }
    

    希望对你有帮助。

    【讨论】:

      猜你喜欢
      • 2017-04-19
      • 2016-05-03
      • 1970-01-01
      • 2018-05-24
      • 1970-01-01
      • 1970-01-01
      • 2016-03-24
      • 1970-01-01
      • 2021-09-09
      相关资源
      最近更新 更多