【问题标题】:Error serializing Avro message - Kafka Schema Registry序列化 Avro 消息时出错 - Kafka Schema Registry
【发布时间】:2020-06-26 05:29:25
【问题描述】:

我正在创建一个 avro 类,其中包含一个字符串和一个映射作为字段。 我可以通过 maven 生成 avro 类,并且可以在 localhost:8081 中创建注册表

.avsc 文件:

    {
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}

架构注册表返回: $ curl -X GET http://localhost:8081/subjects/teste1-value/versions/1

{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}

我的 Kafka Producer Class 是:

public KafkaProducer<String, AvroClass> createKafkaProducer() {
    String bootstrapServer = "127.0.0.1:9092";
    String schemaRegistryURL = "127.0.0.1:8081";

    //create Producer properties
    Properties properties = new Properties();
    //kafka documentation>producer configs
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);

    //create producer
    KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
    return producer;
}

但在运行我的 Kafka Producer 时出现此错误:

    Exception in thread "Thread-1" Exception in thread "Thread-3" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)```

【问题讨论】:

    标签: java apache-kafka avro confluent-schema-registry


    【解决方案1】:

    java.net.MalformedURLException: 没有协议

    客户端如何知道您需要 http 还是 https?没有默认值,因此您必须在注册表 url 上提供它

    【讨论】:

    • 就在鼻子里......谢谢
    • 嗨@OneCricketeer 我发现对于某些主题我需要指定“http://”,但对于一些我没有。你知道是什么控制着它吗?
    • @yifei 例外来自序列化程序配置,它不是特定于主题的。听起来您并不总是在所有主题上都使用方案注册表
    • @OneCricketeer 我从一个集群中使用多个主题,它们都共享相同的模式注册表。虽然每个主题都有一个配置文件。我发现对于某些主题,他们的配置文件指定了没有“http://”的模式注册表,但它们仍然有效。其余的,我必须把协议。
    • @yifei 没错。字符串反序列化器不使用模式注册表,因此该属性什么都不做
    猜你喜欢
    • 1970-01-01
    • 2018-01-31
    • 1970-01-01
    • 2017-12-02
    • 1970-01-01
    • 2016-10-08
    • 2019-08-05
    • 2020-08-11
    • 2020-08-29
    相关资源
    最近更新 更多