【问题标题】:How to create a Topic in Kafka through Java如何通过 Java 在 Kafka 中创建主题
【发布时间】:2015-01-18 03:45:15
【问题描述】:

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符中创建一个主题,并且如果我通过 java api 推送消息,它工作正常。但我想通过java api创建一个主题。经过长时间的搜索,我找到了下面的代码,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息。我的代码有什么问题吗?或者任何其他方式来实现上述目标?

【问题讨论】:

    标签: java apache-zookeeper apache-kafka


    【解决方案1】:
    public static void create(String name) {
       AdminClient client = AdminClient.create(properties());
       NewTopic topic = new NewTopic(
          name,
          (int)conf().get("partition"),
          Short.parseShort(String.valueOf(conf().get("replication.factor"))));
    
       client.createTopics(Collections.singleton(topic));
       client.close();
    }
    

    【讨论】:

      【解决方案2】:

      编辑 - 较新版本的 Kafka 不需要 Zookeeper。有关 API 版本 0.11.0+,请参阅 @Neeleshkumar Srinivasan Mannur 的回答



      原答案

      我修好了..经过长时间的研究..

      ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
      AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
      

      从上面的代码中,ZkClient 会创建一个主题,但是这个主题信息对于 kafka 是没有感知的。所以我们要做的是,我们需要通过以下方式为 ZkClient 创建对象,

      首先导入下面的语句,

      import kafka.utils.ZKStringSerializer$;
      

      并通过以下方式为 ZkClient 创建对象,

      ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
      AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
      

      编辑 1:(用于@ajkret 评论)

      上述代码不适用于 kafka > 0.9,因为 api 已更改, 将以下代码用于 kafka > 0.9


      import java.util.Properties;
      import kafka.admin.AdminUtils;
      import kafka.utils.ZKStringSerializer$;
      import kafka.utils.ZkUtils;
      import org.I0Itec.zkclient.ZkClient;
      import org.I0Itec.zkclient.ZkConnection;
      
      public class KafkaTopicCreationInJava
      {
          public static void main(String[] args) throws Exception {
              ZkClient zkClient = null;
              ZkUtils zkUtils = null;
              try {
                  String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
                  int sessionTimeOutInMs = 15 * 1000; // 15 secs
                  int connectionTimeOutInMs = 10 * 1000; // 10 secs
      
                  zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
                  zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
      
                  String topicName = "testTopic";
                  int noOfPartitions = 2;
                  int noOfReplication = 3;
                  Properties topicConfiguration = new Properties();
      
                  AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
      
              } catch (Exception ex) {
                  ex.printStackTrace();
              } finally {
                  if (zkClient != null) {
                      zkClient.close();
                  }
              }
          }
      }
      

      【讨论】:

      • 是的,向主题发送消息是可能的,但是消费者能得到消息吗?有什么想法吗?
      • 您是通过低级消费者还是高级消费者来消费?能否请您提供一些代码。
      • 0.9.0.1之后界面完全变了。 createTopic 不再接受 ZkClient。
      • AdminUtils 已被 AdminZkClient 弃用
      • zkUtils 现已在 Kafka 2.12-2.1.0 中弃用。
      【解决方案3】:

      API 0.11.0+ 中的过程似乎大大简化了。使用它,可以如下完成

      import org.apache.kafka.clients.admin.AdminClient;
      import org.apache.kafka.clients.admin.CreateTopicsResult;
      import org.apache.kafka.clients.admin.NewTopic;
      
      Properties properties = new Properties();
      properties.load(new FileReader(new File("kafka.properties")));
      
      AdminClient adminClient = AdminClient.create(properties);
      NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)
      
      List<NewTopic> newTopics = new ArrayList<NewTopic>();
      newTopics.add(newTopic);
      
      adminClient.createTopics(newTopics);
      adminClient.close();
      

      kafka.properties文件内容如下

      bootstrap.servers=localhost:9092
      group.id=test
      enable.auto.commit=true
      auto.commit.interval.ms=1000
      key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
      

      请注意,必须关闭 AdminClient 的实例才能反映新创建的主题。

      【讨论】:

      • 感谢您的澄清。这是在哪里记录的?
      • 不要认为它一定需要关闭,基本上createTopics 似乎是返回 Future 类型对象的惰性方法 - 要强制执行,您可以尝试“获取”响应它
      【解决方案4】:

      AdminUtils API 即将被弃用。我们可以使用新的 API AdminZkClient 来管理 Kafka 服务器中的主题。

      String zookeeperHost = "127.0.0.1:2181";
      Boolean isSucre = false;
      int sessionTimeoutMs = 200000;
      int connectionTimeoutMs = 15000;
      int maxInFlightRequests = 10;
      Time time = Time.SYSTEM;
      String metricGroup = "myGroup";
      String metricType = "myType";
      KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                      connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
      
      AdminZkClient adminZkClient = new AdminZkClient(zkClient);
      
      String topicName1 = "myTopic";
      int partitions = 3;
      int replication = 1;
      Properties topicConfig = new Properties();
      
      adminZkClient.createTopic(topicName1,partitions,replication,
                  topicConfig,RackAwareMode.Disabled$.MODULE$);
      

      详情可参考此链接:https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

      【讨论】:

      • AdminZkClient 类的文档 -> This is an internal class and no compatibility guarantees are provided, see org.apache.kafka.clients.admin.AdminClient for publicly supported APIs,目前的正确答案在 Neeleshkumar Srinivasan Mannur's answer
      【解决方案5】:

      对于那些试图在 kafka v0.10.2.1 中实现这一点并遇到序列化错误“java.io.StreamCorruptedException: invalid stream header: 3139322E”问题的人,下面是一个带有必要导入的示例工作代码。

      import org.I0Itec.zkclient.ZkClient;
      import org.I0Itec.zkclient.ZkConnection;
      import org.I0Itec.zkclient.exception.ZkMarshallingError;
      import org.I0Itec.zkclient.serialize.ZkSerializer;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.PartitionInfo;
      
      import kafka.admin.AdminUtils;
      import kafka.admin.RackAwareMode;
      import kafka.utils.ZKStringSerializer;
      import kafka.utils.ZkUtils;
      
      public static void createTopic(String topicName, int numPartitions, int numReplication) {
              ZkClient zkClient = null;
              ZkUtils zkUtils = null;
              try {
                  String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
                  int sessionTimeOutInMs = 15 * 1000; // 15 secs
                  int connectionTimeOutInMs = 10 * 1000; // 10 secs
      
                  zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
                  //Ref: https://gist.github.com/jjkoshy/3842975
                  zkClient.setZkSerializer(new ZkSerializer() {
                      @Override
                      public byte[] serialize(Object o) throws ZkMarshallingError {
                          return ZKStringSerializer.serialize(o);
                      }
      
                      @Override
                      public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                          return ZKStringSerializer.deserialize(bytes);
                      }
                  });
      
                  zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
      
                  int noOfPartitions = 2;
                  int noOfReplication = 3;
                  Properties topicConfiguration = new Properties();
      
                  AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
                          RackAwareMode.Enforced$.MODULE$);
      
              } catch (Exception ex) {
                  ex.printStackTrace();
              } finally {
                  if (zkClient != null) {
                      zkClient.close();
                  }
              }
          }
      

      【讨论】:

        【解决方案6】:

        只是指向任何使用 Kafka 更新版本的人的指针(在撰写本文时,我使用的是 Kafka v0.10.0.0)

        你必须改变;

        AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
        

        以下;

        AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
        

        完成后关闭连接也是个好主意;

        zkClient.close();
        

        【讨论】:

          猜你喜欢
          • 2016-03-24
          • 2017-10-18
          • 1970-01-01
          • 2015-03-22
          • 2016-05-03
          • 2016-07-26
          • 2021-09-09
          • 2022-01-15
          相关资源
          最近更新 更多