【问题标题】:Kafka create topic by using TopicCommandKafka 使用 TopicCommand 创建主题
【发布时间】:2019-01-14 02:12:14
【问题描述】:

我想用java创建一个主题。有我的代码。

String s = "--topic pt8 --create --zookeeper 10.11.6.52:2181 --replica-assignment 7";
String[] args2 = s.split(" ");
TopicCommand.main(args2);

但是有一个错误:

[ZkClient-EventThread-14-10.11.6.52:2181] INFO o.I.z.ZkEventThread - 启动 ZkClient 事件线程。

[main] INFO o.I.z.ZkClient - 等待 keeper 状态 SyncConnected [main-EventThread] INFO o.I.z.ZkClient - zookeeper 状态改变 (同步连接)

执行主题命令时出错: java.lang.ExceptionInInitializerError

[ZkClient-EventThread-14-10.11.6.52:2181] 信息 o.I.z.ZkEventThread - 终止 ZkClient 事件线程。

--list --zookeeper 10.11.6.52:2181可以得到结果。 --delete --zookeeper 10.11.6.52:2181 --topic pt7 得到 Error while executing topic command : null

我的 pom.xml:

        <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

使用管理员:

ZkClient  zkClient = new ZkClient("10.11.6.52:2181", 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
AdminUtils.createTopic(zkUtils, "pt8", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

错误:

线程“主”kafka.admin.AdminOperationException 中的异常:java.lang.ExceptionInInitializerError

【问题讨论】:

  • Zookeeper 正在运行吗?
  • 我使用的是集群 Zookeeper IP。 --list --zookeeper 10.11.6.52:2181 工作正常。

标签: java apache-kafka


【解决方案1】:

不要使用 shell 命令并尝试从 JAVA 执行它,而是使用 KAFKA Admin 客户端 API,它应该适用于 Kafka 0.11+。

这是一个代码sn-p:

void setUpKafkaTopics(KafkaAdminClient kafkaAdminClient) throws ExecutionException, InterruptedException {
  final Map<String, Integer> topics = new HashMap<>();
  topics.put(topicName, numOfPartitions);
  kafkaAdminClient.createTopics(topics, getTopicConfig(), replicationFactor);
}


Map<String, String> getTopicConfig() {
  Map<String, String> topicConfiguration = new HashMap<>();
  topicConfiguration.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
     Boolean.FALSE.toString());
  topicConfiguration.put(TopicConfig.CLEANUP_POLICY_CONFIG,
     TopicConfig.CLEANUP_POLICY_DELETE);
  topicConfiguration.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
     KAFKA_TOPIC_COMPRESSION_TYPE);
  topicConfiguration.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
     KAFKA_TOPIC_MIN_IN_SYNC_REPLICAS.toString()); 
  topicConfiguration.put(TopicConfig.RETENTION_MS_CONFIG,
     KAFKA_TOPIC_RETENTION_MS.toString()); 
  return topicConfiguration;
}

【讨论】:

  • 我不认为使用 0.10.2.1 依赖项时可以使用
  • 哦,是的,对不起,我刚刚看到你在使用0.10.2.1,管理员客户进来0.11.x
  • 注意:这不是“我的”问题
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-22
  • 2019-11-08
  • 2018-02-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多