【问题标题】:How Can we create a topic in Kafka from the IDE using API我们如何使用 API 从 IDE 在 Kafka 中创建主题
【发布时间】:2013-06-01 13:22:56
【问题描述】:

我们如何使用 API 从 IDE 在 Kafka 中创建主题,因为当我这样做时:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

我得到错误:

bash: bin/kafka-create-topic.sh: No such file or directory

我按照开发人员的设置进行操作。

【问题讨论】:

  • 在运行命令之前,您应该位于 KAFKA_BASE_DIR(安装您的 kafka 的位置,例如 /var/kafka)内。
  • 直接进入bin里面然后写sh kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181。或者 sh ./kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181 bin 前面的一个目录。
  • 你需要按照bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 的描述做一些事情here
  • @Hild 将 bin 添加到 PATH 也可以正常工作

标签: java apache-kafka


【解决方案1】:

基于最新的kafka-client api和Kafka 2.1.1,工作版本代码如下:

使用 sbt 导入最新的 kafka-clients。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "2.1.1")

scala中创建主题的代码:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

class CreateKafkaTopic {
  def create(): Unit = {
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  }

}

使用以下方法验证新主题:

./kafka-topics.sh --zookeeper 192.30.1.5:2181 --list

希望它对某人有所帮助。 参考:http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html

【讨论】:

    【解决方案2】:

    我们可以使用AdminZkClient 来管理Kafka服务器中的主题。

    String zookeeperHost = "127.0.0.1:2181";
    Boolean isSucre = false;
    int sessionTimeoutMs = 200000;
    int connectionTimeoutMs = 15000;
    int maxInFlightRequests = 10;
    Time time = Time.SYSTEM;
    String metricGroup = "myGroup";
    String metricType = "myType";
    KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                    connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
    
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    
    String topicName1 = "myTopic";
    int partitions = 3;
    int replication = 1;
    Properties topicConfig = new Properties();
    
    adminZkClient.createTopic(topicName1,partitions,replication,
                topicConfig,RackAwareMode.Disabled$.MODULE$);
    

    您可以参考此链接了解详细信息 https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

    【讨论】:

      【解决方案3】:

      从 0.11.0.0 开始,您只需要:

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.11.0.0</version>
      </dependency>
      

      此工件现在包含AdminClient (org.apache.kafka.clients.admin)。

      AdminClient 可以处理许多 Kafka 管理任务,包括创建主题:

      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
      
      AdminClient admin = AdminClient.create(config);
      
      Map<String, String> configs = new HashMap<>();
      int partitions = 1;
      int replication = 1;
      
      admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));
      

      此命令的输出是 CreateTopicsResult,您可以使用它为整个操作或每个单独的主题创建获取 Future

      • 要获得整个操作的未来,请使用CreateTopicsResult#all()
      • 要单独获取所有主题的Futures,请使用CreateTopicsResult#values()

      例如:

      CreateTopicsResult result = ...
      KafkaFuture<Void> all = result.all();
      

      或:

      CreateTopicsResult result = ...
      for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
          try {
              entry.getValue().get();
              log.info("topic {} created", entry.getKey());
          } catch (InterruptedException | ExecutionException e) {
              if (Throwables.getRootCause(e) instanceof TopicExistsException) {
                  log.info("topic {} existed", entry.getKey());
              }
          }
      }
      

      KafkaFuture 是“一个支持调用链和其他异步编程模式的灵活未来”,并且“最终将成为 Java 8 的CompletebleFuture 之上的一个薄垫片。”

      【讨论】:

      • 永远挂起 :-(
      【解决方案4】:

      从 Kafka 0.10.1 开始,Michael 提到的 ZKStringSerializer 是私有的(对于 Scala)。您可以使用 ZkUtils 中的工厂方法 createZkClient 或 createZkClientAndConnection。

      Kafka 0.10.1 的 Scala 示例:

      import kafka.utils.ZkUtils
      
      val sessionTimeoutMs = 10000
      val connectionTimeoutMs = 10000
      val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
        "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 
      

      然后按照 Michael 的建议创建主题:

      import kafka.admin.AdminUtils
      
      val zkUtils = new ZkUtils(zkClient, zkConnection, false)
      val numPartitions = 4
      val replicationFactor = 1
      val topicConfig = new Properties
      val topic = "my-topic"
      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
      

      【讨论】:

        【解决方案5】:

        在 Kafka 0.8.1+(截至目前最新版本的 Kafka)中,您可以通过 AdminCommand 以编程方式创建新主题。在此问题的先前答案之一中提到的CreateTopicCommand(旧版 Kafka 0.8.0 的一部分)的功能已移至AdminCommand

        Kafka 0.8.1 的 Scala 示例:

        import kafka.admin.AdminUtils
        import kafka.utils.ZKStringSerializer
        import org.I0Itec.zkclient.ZkClient
        
        // Create a ZooKeeper client
        val sessionTimeoutMs = 10000
        val connectionTimeoutMs = 10000
        // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
        // createTopic() will only seem to work (it will return without error).  The topic will exist in
        // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
        // topic.
        val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
            ZKStringSerializer)
        
        // Create a topic named "myTopic" with 8 partitions and a replication factor of 3
        val topicName = "myTopic"
        val numPartitions = 8
        val replicationFactor = 3
        val topicConfig = new Properties
        AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)
        

        构建依赖,以 sbt 为例:

        libraryDependencies ++= Seq(
          "com.101tec" % "zkclient" % "0.4",
          "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
            exclude("javax.jms", "jms")
            exclude("com.sun.jdmk", "jmxtools")
            exclude("com.sun.jmx", "jmxri"),
          ...
        )
        

        编辑:为 Kafka 0.9.0.0 添加了 Java 示例(截至 2016 年 1 月的最新版本)。

        Maven 依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.7</version>
        </dependency>
        

        代码:

        import org.I0Itec.zkclient.ZkClient;
        import org.I0Itec.zkclient.ZkConnection;
        
        import java.util.Properties;
        
        import kafka.admin.AdminUtils;
        import kafka.utils.ZKStringSerializer$;
        import kafka.utils.ZkUtils;
        
        public class KafkaJavaExample {
        
          public static void main(String[] args) {
            String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
            int sessionTimeoutMs = 10 * 1000;
            int connectionTimeoutMs = 8 * 1000;
            // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
            // createTopic() will only seem to work (it will return without error).  The topic will exist in
            // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
            // topic.
            ZkClient zkClient = new ZkClient(
                zookeeperConnect,
                sessionTimeoutMs,
                connectionTimeoutMs,
                ZKStringSerializer$.MODULE$);
        
            // Security for Kafka was added in Kafka 0.9.0.0
            boolean isSecureKafkaCluster = false;
            ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
        
            String topic = "my-topic";
            int partitions = 2;
            int replication = 3;
            Properties topicConfig = new Properties(); // add per-topic configurations settings here
            AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
            zkClient.close();
          }
        
        }
        

        编辑 2:为 Kafka 0.10.2.0 添加了 Java 示例(截至 2017 年 4 月的最新版本)。

        Maven 依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.9</version>
        </dependency>
        

        代码:

        import org.I0Itec.zkclient.ZkClient;
        import org.I0Itec.zkclient.ZkConnection;
        
        import java.util.Properties;
        
        import kafka.admin.AdminUtils;
        import kafka.admin.RackAwareMode;
        import kafka.utils.ZKStringSerializer$;
        import kafka.utils.ZkUtils;
        
        public class KafkaJavaExample {
        
          public static void main(String[] args) {
            String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
            int sessionTimeoutMs = 10 * 1000;
            int connectionTimeoutMs = 8 * 1000;
        
            String topic = "my-topic";
            int partitions = 2;
            int replication = 3;
            Properties topicConfig = new Properties(); // add per-topic configurations settings here
        
            // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
            // createTopic() will only seem to work (it will return without error).  The topic will exist in
            // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
            // topic.
            ZkClient zkClient = new ZkClient(
                zookeeperConnect,
                sessionTimeoutMs,
                connectionTimeoutMs,
                ZKStringSerializer$.MODULE$);
        
            // Security for Kafka was added in Kafka 0.9.0.0
            boolean isSecureKafkaCluster = false;
        
            ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
            AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
            zkClient.close();
          }
        
        }
        

        【讨论】:

        • 请注意,当不使用 ZKStringSerializer 初始化 ZkClient 时,createTopic 将无错误返回。该主题会存在于zookeeper中,并在列出主题时返回,但Kafka本身并不创建该主题。
        • @quux00 您可以使用我编写的以下代码轻松创建它:ZkClient zkClient = new ZkClient(zkServer); zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); 请参考:source code for zkClient link
        【解决方案6】:

        您正在尝试使用哪个 IDE?

        请提供完整路径,下面是来自终端的命令,它将创建一个主题

        1. cd kafka/bin
        2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181

        【讨论】:

          【解决方案7】:

          如果您使用的是 Kafka 0.10.0.0+,从 Java 创建主题需要传递 RackAwareMode 类型的参数。这是一个 Scala 案例对象,从 Java 中获取它的实例很棘手(例如证明:How do I "get" a Scala case object from Java?。但它不适用于我们的案例)。

          幸运的是,rackAwareMode 是一个可选参数。然而 Java 不支持可选参数。我们如何解决这个问题?这是一个解决方案:

          AdminUtils.createTopic(zkUtils, topic, 1, 1, 
              AdminUtils.createTopic$default$5(),
              AdminUtils.createTopic$default$6());
          

          将它与 miguno 的答案一起使用,你就可以开始了。

          【讨论】:

            【解决方案8】:

            从 Kafka 0.8 Producer Example 开始,下面的示例将创建一个名为 page_visits 的主题,并且如果在 Kafka Broker config 文件中将 auto.create.topics.enable 属性设置为 true(默认),也会开始生成

            import java.util.*;
            
            import kafka.javaapi.producer.Producer;
            import kafka.producer.KeyedMessage;
            import kafka.producer.ProducerConfig;
            
            public class TestProducer {
                public static void main(String[] args) {
                    long events = Long.parseLong(args[0]);
                    Random rnd = new Random();
            
                    Properties props = new Properties();
                    props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
                    props.put("serializer.class", "kafka.serializer.StringEncoder");
                    props.put("partitioner.class", "example.producer.SimplePartitioner");
                    props.put("request.required.acks", "1");
            
                    ProducerConfig config = new ProducerConfig(props);
            
                    Producer<String, String> producer = new Producer<String, String>(config);
            
                    for (long nEvents = 0; nEvents < events; nEvents++) { 
                        long runtime = new Date().getTime();  
                        String ip = “192.168.2.” + rnd.nextInt(255); 
                        String msg = runtime + “,www.example.com,” + ip; 
                        KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
                        producer.send(data);
                    }
                    producer.close();
               }
            }
            

            【讨论】:

            • 我相信您知道从技术上讲,这并不像 OP 所要求的那样通过 API 创建主题。它只是使用auto.create.topics.enable 功能在引用不存在的主题时创建一个主题。并且使用这种方法,除了在代理配置文件中默认硬编码之外,没有其他方法可以指定主题设置。
            【解决方案9】:

            您可以尝试使用 kafka.admin.CreateTopicCommand scala 类从 Java 代码创建主题...提供必要的参数。

            String [] arguments = new String[8];
            arguments[0] = "--zookeeper";
            arguments[1] = "10.***.***.***:2181";
            arguments[2] = "--replica";
            arguments[3] = "1";
            arguments[4] = "--partition";
            arguments[5] = "1";
            arguments[6] = "--topic";
            arguments[7] = "test-topic-Biks";
            
            CreateTopicCommand.main(arguments);
            

            注意:您应该为 jopt-simple-4.5zkclient-0.1 添加 maven 依赖项

            【讨论】:

              【解决方案10】:

              要通过 java api 和 Kafka 0.8+ 创建主题,请尝试以下操作,

              首先导入下面的语句

              import kafka.utils.ZKStringSerializer$;
              

              通过以下方式为ZkClient创建对象,

              ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
              AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
              

              【讨论】:

                【解决方案11】:

                您的电话无法正常工作的几种方式。

                1. 如果您的 Kafka 集群没有足够的节点来支持复制值 3。

                2. 如果有 chroot 路径前缀,则必须在 zookeeper 端口后附加它

                3. 运行时你不在 Kafka 安装目录中(这很有可能)

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 2021-07-21
                  • 2021-09-09
                  • 2018-03-31
                  • 1970-01-01
                  • 1970-01-01
                  • 2016-07-26
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多