【问题标题】:Topic not present in metadata after 60000 ms60000 毫秒后元数据中不存在主题
【发布时间】:2020-02-10 18:32:43
【问题描述】:

我在 MSK(Kafka) 中创建了一个主题。 而且我还注册了 avro 模式。 现在我正在尝试为该主题生成一条消息,但是当我运行我的生产者时,我得到以下错误

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic AVRO-AUDIT_EVENT not present in metadata after 60000 ms.

这是我生成 avro 消息的 java 代码

String topicName = "AVRO-AUDIT_EVENT";

        Properties props = new Properties();
        props.put("bootstrap.servers",
                "b-3.*****:9092,b-4.****:9092,b-5.****:9092");
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/");

        JSONObject job = new JSONObject(json);
        String bodyofJson = job.getString("body");
        JSONObject bodyJsonObj = new JSONObject(bodyofJson);
        System.out.println(bodyJsonObj.get("ID"));

        Producer<String, String> producer = new KafkaProducer<>(props);
        try {

            producer.send(new ProducerRecord<String, String>(topicName, bodyJsonObj.get("ID").toString(), bodyofJson))
                    .get();

            System.out.println("Complete");
        } catch (Exception ex) {
            ex.printStackTrace(System.out);
        } finally {
            producer.close();
        }

我可以列出主题并查看主题名称,也可以阅读主题消息。但是当我运行这个时,我得到了这个错误。

问候

【问题讨论】:

    标签: java apache-kafka kafka-producer-api


    【解决方案1】:

    我假设你没有在本地运行 Kafka...

    删除这一行

    props.put("bootstrap.servers", "localhost:9092,localhost:9093");
    

    也可以在 AWS 中部署架构注册表,并更改此行(以及地址)

    props.put("schema.registry.url", "http://localhost:8081/");
    

    另外,如果您想发送 Avro,请创建一个 GenericRecord Avro 对象,而不是 JSON 字符串。否则,您的架构就是 "string"

    【讨论】:

    • 对不起,我没有得到你的点 AVRO 对象..我如何创建它..我正在从亚马逊 API 输入解析 json..
    • 您的代码根本不会生成 Avro 对象。删除JSONObject的所有用法,请阅读github.com/simplesteph/kafka-avro-course/blob/master/…
    • 我做了修改,但现在我收到org.apache.kafka.common.errors.SerializationException: Error serializing Avro message 错误.. 所以我是斗牛士使用模式和通用 avro 吗?
    • 我也已经在http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/ 注册了架构,所以我给了这个位置吗?
    • 如果您观看完整的视频课程演练,也许会对您有所帮助udemy.com/course/confluent-schema-registry
    【解决方案2】:

    就我而言,周末它失去了与所有节点的连接。好像没有自动恢复连接。

    重启服务后,一切正常。

    【讨论】:

      猜你喜欢
      • 2020-12-22
      • 1970-01-01
      • 1970-01-01
      • 2019-03-18
      • 1970-01-01
      • 1970-01-01
      • 2018-03-28
      • 1970-01-01
      • 2017-08-31
      相关资源
      最近更新 更多