【问题标题】:KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecordKafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord
【发布时间】:2017-01-29 01:59:00
【问题描述】:

我的KafkaProducer 能够使用KafkaAvroSerializer 将对象序列化到我的主题。但是,KafkaConsumer.poll() 返回反序列化的 GenericRecord 而不是我的序列化类。

我的KafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

我的卡夫卡消费者

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for (ConsumerRecord<CharSequence, MyBean> record : records) {
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        }
      }

MyBean bean = record.value(); 该行抛出一个强制转换异常,因为它无法将 GenericRecord 强制转换为 MyBean。

我正在使用kafka-client-0.9.0.1kafka-avro-serializer-3.0.0

【问题讨论】:

    标签: java apache-kafka avro confluent-platform confluent-schema-registry


    【解决方案1】:

    KafkaAvroDeserializer 支持 SpecificData

    默认情况下不启用。要启用它:

    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    

    KafkaAvroDeserializer 不支持 ReflectData

    Confluent 的KafkaAvroDeserializer 不知道如何使用 Avro ReflectData 进行反序列化。我不得不扩展它以支持 Avro ReflectData:

    /**
     * Extends deserializer to support ReflectData.
     *
     * @param <V>
     *     value type
     */
    public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {
    
      private Schema readerSchema;
      private DecoderFactory decoderFactory = DecoderFactory.get();
    
      protected ReflectKafkaAvroDeserializer(Class<V> type) {
        readerSchema = ReflectData.get().getSchema(type);
      }
    
      @Override
      protected Object deserialize(
          boolean includeSchemaAndVersion,
          String topic,
          Boolean isKey,
          byte[] payload,
          Schema readerSchemaIgnored) throws SerializationException {
    
        if (payload == null) {
          return null;
        }
    
        int schemaId = -1;
        try {
          ByteBuffer buffer = ByteBuffer.wrap(payload);
          if (buffer.get() != MAGIC_BYTE) {
            throw new SerializationException("Unknown magic byte!");
          }
    
          schemaId = buffer.getInt();
          Schema writerSchema = schemaRegistry.getByID(schemaId);
    
          int start = buffer.position() + buffer.arrayOffset();
          int length = buffer.limit() - 1 - idSize;
          DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
          BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
          return reader.read(null, decoder);
        } catch (IOException e) {
          throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
        } catch (RestClientException e) {
          throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
        }
      }
    }
    

    定义一个反序列化为MyBean的自定义反序列化器类:

    public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
      public MyBeanDeserializer() {
        super(MyBean.class);
      }
    }
    

    配置KafkaConsumer 使用自定义解串器类:

    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
    

    【讨论】:

    • 谢谢!我认为生成的 bean 扩展 SpecificRecordBase 并实现 SpecificRecord 那么它与 Avro Reflect Data 有什么关系?我是 Avro 的新手,所以只想更好地理解。
    • 我尝试了您的代码并得到以下异常: 原因:org.apache.avro.AvroTypeException:找到字符串,期望 com.MyBean 在 org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder .java:292) 位于 org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130) 的 org.apache.avro.io.parsing.Parser.advance(Parser.java:88)。 avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:223) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(Gene
    • 除了切换到您提供的新反序列化器外,我保持一切不变。
    • 我不知道您使用的是SpecificData。我更新了我的答案。
    • properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true) 为我工作。顺便说一句,某处有文档吗?实际上,我一直在反复试验以解决问题。
    【解决方案2】:

    编辑:反映数据支持已合并(见下文)

    要添加到 Chin Huang 的答案,为了最少的代码和更好的性能,您应该以这种方式实现它:

    /**
     * Extends deserializer to support ReflectData.
     *
     * @param <V>
     *     value type
     */
    public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V> {
      private final Schema schema;
      private Class<T> type;
      private DecoderFactory decoderFactory = DecoderFactory.get();
    
      protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props) {
        this.type = type;
        this.schema = ReflectData.get().getSchema(type);
        this.configure(this.deserializerConfig(props));
      }
    
      public void configure(Map<String, ?> configs) {
        this.configure(new KafkaAvroDeserializerConfig(configs));
      }
    
      @Override
      protected T deserialize(
              boolean includeSchemaAndVersion,
              String topic,
              Boolean isKey,
              byte[] payload,
              Schema readerSchemaIgnore) throws SerializationException {
    
        if (payload == null) {
          return null;
        }
    
        int schemaId = -1;
        try {
          ByteBuffer buffer = ByteBuffer.wrap(payload);
          if (buffer.get() != MAGIC_BYTE) {
            throw new SerializationException("Unknown magic byte!");
          }
    
          schemaId = buffer.getInt();
          Schema schema = schemaRegistry.getByID(schemaId);
    
          Schema readerSchema = ReflectData.get().getSchema(type);
    
          int start = buffer.position() + buffer.arrayOffset();
          int length = buffer.limit() - 1 - idSize;
          SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
          BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
          return reader.read(null, decoder);
        } catch (IOException e) {
          throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
        } catch (RestClientException e) {
          throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
        }
      }
    }
    

    【讨论】:

    猜你喜欢
    • 2019-04-10
    • 1970-01-01
    • 2016-02-22
    • 2021-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-13
    相关资源
    最近更新 更多