【问题标题】:I'm getting "Topic not present in metadata after 60000 ms" message on some computers我在某些计算机上收到“60000 毫秒后元数据中不存在主题”消息
【发布时间】:2020-07-15 00:46:54
【问题描述】:

这是我的程序

package kafkaConsumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.Future;

public class KafkaConsumerExample {
    private final static String INTOPIC = "my-intopic";
    private final static String OUTTOPIC = "my-outtopic";
    private final static String BOOTSTRAP_SERVERS = "192.168.10.10:9092";

    private static Producer<Long, String> createProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    private static Consumer<Long, String> createConsumer(String intopic, String bootstrapServers) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerExample");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        // Create the consumer using props.
        final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
        // Subscribe to the topic.
        consumer.subscribe(Collections.singletonList(intopic));
        return consumer;
    }

    static boolean run(
            Consumer<Long, String> consumer, Producer<Long, String> producer,
            String inTopic, String outTopic) throws InterruptedException {

        String valueToSend;
        long keyToUse;

        if (consumer == null) {
            Scanner sc = new Scanner(System.in);
            System.out.print("Enter key> ");
            keyToUse = sc.nextLong();
            valueToSend = sc.nextLine();
            System.out.print("Enter value> ");
            valueToSend = sc.nextLine();
        } else {
            Duration delta = Duration.ofSeconds(1);
            ConsumerRecords<Long, String> consumerRecords = consumer.poll(delta);
            while (consumerRecords.count() == 0) {
                consumerRecords = consumer.poll(delta);
            }
            ConsumerRecord<Long, String> record = consumerRecords.iterator().next();
            keyToUse = record.key();
            valueToSend = record.value();
            if (producer != null)
                System.out.println("Got key = " + keyToUse + " and value = " + valueToSend);
        }

        if (producer == null) {
            System.out.println("key = " + keyToUse + " and value = " + valueToSend);
        } else {
            try {
                System.out.println("Creating ProducerRecord");
                final ProducerRecord<Long, String> record =
                        new ProducerRecord<>(outTopic, keyToUse, valueToSend);
                System.out.println("Calling producer.send");
                Future<RecordMetadata> sent = producer.send(record);
                System.out.println("Calling sent.get");
                RecordMetadata metadata = sent.get();
                System.out.println("Calling flush");
                producer.flush();
                System.out.println("After flush");
            } catch (Exception e) {
                System.out.println("Exception sending message: " + e.getMessage());
            }
        }
        return !valueToSend.equals("STOP");
    }

    public static void usage() {
        System.out.println(System.getProperty("sun.java.command"));
        System.out.println();
        System.out.println("Usage parameters: [--intopic name] [--outtopic name] [--bootstrap-servers servers]");
        System.exit(1);
    }

    public static void main(String... args) throws Exception {
        String inTopic = INTOPIC;
        String outTopic = OUTTOPIC;
        String bootstrapServers = BOOTSTRAP_SERVERS;

        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("--intopic")) {
                if (i == args.length - 1) {
                    usage();
                }
                inTopic = args[++i];
            } else if (args[i].equals("--outtopic")) {
                if (i == args.length - 1) {
                    usage();
                }
                outTopic = args[++i];
            } else if (args[i].equals("--bootstrap-servers")) {
                if (i == args.length - 1) {
                    usage();
                }
                bootstrapServers = args[++i];
            } else {
                usage();
            }
        }

        final Consumer<Long, String> consumer;
        if (inTopic.equals("stdin")) {
            consumer = null;
        } else {
            consumer = createConsumer(inTopic, bootstrapServers);
        }
        final Producer<Long, String> producer;
        if (outTopic.equals("stdout")) {
            producer = null;
        } else {
            producer = createProducer(bootstrapServers);
        }

        while (true) {
            if (!run(consumer, producer, inTopic, outTopic)) {
                break;
            }
        }
        if (consumer != null)
            consumer.close();
        if (producer != null)
            producer.close();
    }
}

我在 Windows 和 Linux 上运行它。在某些计算机上它运行良好,但在其他计算机上,特别是不是 kafka 机器的 Linux 机器上,它一直给我这个错误:

Exception sending message: org.apache.kafka.common.errors.TimeoutException: Topic outtopic not present in metadata after 60000 ms.

这当然发生在尝试在run() 函数中发送消息时,特别是在句子RecordMetadata metadata = sent.get() 中。

此 kafka 安装允许自动创建新主题。实际上,如果我在 --outtopic 参数中输入一个新名称,即使发送消息失败,也会创建主题。

任何线索为什么?我在配置中缺少什么?

谢谢

西蒙

【问题讨论】:

  • 不,它不在 docker 容器中。这是一组计算机。

标签: java apache-kafka kafka-producer-api


【解决方案1】:
192.168.10.10:9092

这似乎是一个内部 IP。检查你无法访问的客户端是否在其网络范围内,即是否​​可以访问该IP。

尝试从您的客户端机器上进行 telnet..

telnet 192.168.10.10 9092

如果您无法telnet,则提供您的客户端可以访问的IP,并确保advertised.listeners 中的IP 也相同。

同时检查您的 advertised.listeners 配置。当我们连接到 bootstrap.servers 中给出的 URL 时,通常应该与 advertised.listeners 配置中的 URL 相同。

主题元数据不存在意味着您的客户端无法获取有关给定主题的任何信息,即它无法通过给定的bootstrap.servers 属性获取元数据。

【讨论】:

  • 对不起,我在发布之前更改了 IP 地址。但它实际上是我正在使用的内部 IP 地址。 Telnet 从计算机到 kafka 主机。另外,也不是通讯失败。正如我所说,如果我使用新主题,则会创建主题;只是发送此消息失败。
  • @Shimon 你检查你的advertised.listeners 配置了吗?您在该配置中指定的 IP 应该可以从您的客户端计算机访问,并且还应该指向运行 Kafka 的计算机的 IP
  • 对不起,我不知道如何设置advertised.listeners。我应该在客户端机器的哪里定义它?
  • @Shimon 这是一个代理配置。您需要编辑在启动 kafka 代理时提供的 server.properties 文件
  • 非常感谢@JavaTechnical。我发现我既需要advertised.listeners 配置,也需要停止firewalld 守护进程:)。我稍后会配置防火墙,但现在advertised.listeners 成功了!发自内心的感谢。
猜你喜欢
  • 1970-01-01
  • 2020-12-22
  • 1970-01-01
  • 2019-03-18
  • 1970-01-01
  • 2017-06-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多