【问题标题】:Kafka Avro Serializer and deserializer exception. Avro supported typesKafka Avro 序列化器和反序列化器异常。 Avro 支持的类型
【发布时间】:2018-12-10 23:31:40
【问题描述】:

我看到以下错误

exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

我的 kafka 制作人道具是

Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");
    props.put("producer.type", "sync");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);

    Producer<String, TweetInfoDto> producer = new KafkaProducer(props);

我的卡夫卡消费者道具是

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");

    Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);

不知道我做错了什么。

【问题讨论】:

  • 那么,TweetInfoDtoIndexedRecord,例如 SpecificAvroRecord
  • 另外,value.converter 用于 Connect API,而不是生产者/消费者

标签: java serialization apache-kafka avro confluent-schema-registry


【解决方案1】:

TweetInfoDto 不能是您自己定义的纯 Java 对象。

理想情况下,它应该通过 Avro Maven Plugin 从 Avro 模式创建,例如。

请参阅Schema Registry Tutorial 了解所有步骤,包括定义 AVSC,并为其生成 Java 类。

Tutorial sample code here

【讨论】:

    【解决方案2】:

    除了 cricket_007 提到的内容之外,可以考虑使用 avro tools - Serializing and deserializing with code generation

    【讨论】:

    • 这就是教程中avro-maven-plugin使用的内容
    猜你喜欢
    • 2015-08-01
    • 2019-11-18
    • 2021-03-11
    • 2020-10-05
    • 2019-08-30
    • 2019-04-04
    • 2018-08-11
    • 2019-07-30
    • 1970-01-01
    相关资源
    最近更新 更多