【问题标题】:Gets an error error in Kafka producer when creating topic but the topic is created on the Kafka server创建主题时在 Kafka 生产者中获取错误错误,但主题是在 Kafka 服务器上创建的
【发布时间】:2017-07-18 12:55:36
【问题描述】:

我正在使用 Kafka producer 10.2.1 创建主题并写入主题,当我创建主题时出现以下错误,但主题已创建:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:774)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:494)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
    at kafka.AvroProducer.produce(AvroProducer.java:47)
    at samples.TestMqttSource.messageReceived(TestMqttSource.java:89)
    at mqtt.JsonConsumer.messageArrived(JsonConsumer.java:132)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:477)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:380)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:184)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
msg org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
loc org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
cause org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
excep java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

非常感谢所有建议。

【问题讨论】:

  • 生产者无法创建主题。 Admin Client API 可以做到这一点。创建主题是因为代理上启用了主题自动创建(auto.create.topics.enable 属性)(默认情况下)。你能显示代码吗?
  • 感谢您的评论。我找到了解决方案,并试图在下面的答案的评论中解释“问题”。

标签: java timeout apache-kafka


【解决方案1】:

您不能使用 KafkaProducer 创建主题 (所以我不太确定您是如何创建主题的,除非您之前通过其他方法(例如 kafka 管理 shell 脚本)创建)。相反,您使用 Kafka 库提供的 AdminUtils。

我最近实现了您所追求的两个要求,您会惊讶于它是多么容易实现。下面是一个简单的代码示例,向您展示如何通过 AdminUtils 创建主题,以及如何写入主题。

class Foo {

    private String TOPIC = "testingTopic";
    private int NUM_OF_PARTITIONS = 10;
    private int REPLICATION_FACTOR = 1;

    public Foo() {


        ZkClient zkClient = new ZkClient( "localhost:2181", 15000, 10000, ZKStringSerializer$.MODULE$ );
        ZkUtils zkUtils = new ZkUtils( zkClient, new ZkConnection( "localhost:2181" ), false);

        if ( !AdminUtils.topicExists(zkUtils, TOPIC) ) {
            try {
                AdminUtils.createTopic(zkUtils, TOPIC, NUM_OF_PARTITIONS, REPLICATION_FACTOR, new Properties(), Enforced$.MODULE$);

                Properties producerConfig = new Properties();

                producerConfig.put(ProducerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost:9092");
                producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

                KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

                // This is just to show you how to write but you could be more elaborate
                int i = 0;

                while ( i < 11 ) {
                    ProducerRecord<String, String> rec = new ProducerRecord<>(TOPIC, ("This is line number " + i));
                    producer.send(rec);
                    i++;
                }

                producer.closer();
            } catch ( AdminOperationException aoe ) {
                aoe.printStackTrace();
            }
        }

    }

}

请记住,如果您想删除主题,默认情况下在设置中禁用此功能。启动 Kafka 时使用的配置文件(默认为 ${kafka_home}/config/server.properties),如果它不存在并设置为 false 或已注释,则添加以下行出:

delete.topic.enabled=true

然后您必须重新启动服务器,并可以通过 Java 或提供的命令行工具删除主题。

注意

当您完成生产者/消费者时关闭它们总是一个好主意,如代码示例所示。

【讨论】:

  • 感谢您的回答和评论。我们的 Kafka 设置为从消息中自动生成主题。我发现了问题所在,我在 Windows PC 上运行客户端,并且在连接到 Kafka 时使用了 Kafka 服务器的 IP 地址,但我认为在主题创建的元数据中,返回kafkaserver名称,所以我将Kafka服务器的IP地址和主机名添加到我的主机文件中,它可以工作。
  • 啊,好吧,发生这种情况有点不寻常!但很高兴一切正常
猜你喜欢
  • 2018-07-27
  • 1970-01-01
  • 1970-01-01
  • 2017-07-21
  • 1970-01-01
  • 2019-11-04
  • 1970-01-01
  • 2022-12-22
相关资源
最近更新 更多