【问题标题】:SerializationException of Avro Date Object (Date LogicalType)Avro 日期对象的序列化异常 (Date LogicalType)
【发布时间】:2017-03-28 04:04:57
【问题描述】:

我有一个接受 GenericRecord 类的发布者。

@Override
public Future<RecordMetadata> publish(GenericRecord genericRecord) {
    Future<RecordMetadata> recordMetadataFuture =
            getPublisher().send(new ProducerRecord<>(producerConfiguration.getProperties()
                    .getProperty(ProducerConfiguration.PROPERTY_NAME_TOPIC), "sample.key",genericRecord));

    return recordMetadataFuture;
}

private KafkaProducer<String, GenericRecord> getPublisher() {
    return new KafkaProducer<>(producerConfiguration.getProperties());
}

我有以下 avro 架构:

{
"type" : "record",
"name" : "SampleDate",
"namespace": "com.sample.data.generated.avro",
"doc" : "sample date",
"fields" :  [
    {
        "name" : "sampleDate",
        "type" : {
            "type" : "int",
            "logicalType" : "date"
        }
    }
  ]
}

我已经构建了自己的序列化器:

日期序列化器:

@Component
public class SampleDateSerializer implements Serializer<GenericRecord> {

private AvroGenericSerializer serializer;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    serializer = new AvroGenericSerializer(SampleDate.SCHEMA$);
}

@Override
public byte[] serialize(String topic, GenericRecord data) {
    return serializer.serialize(data);
}

@Override
public void close() {

}

通用序列化器:

public class AvroGenericSerializer {
private EncoderFactory avroEncoderFactory;
private DecoderFactory avroDecoderFactory;
private GenericDatumWriter<GenericRecord> avroWriter;
private GenericDatumReader<GenericRecord> avroReader;

public AvroGenericSerializer(Schema schema) {
    avroEncoderFactory = EncoderFactory.get();
    avroDecoderFactory = DecoderFactory.get();
    avroWriter = new GenericDatumWriter<>(schema);
    avroReader = new GenericDatumReader<>(schema);
}

public byte[] serialize(GenericRecord data) {
    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
    final BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);
    try {
        avroWriter.write(data, binaryEncoder);
        binaryEncoder.flush();
        stream.close();
        return stream.toByteArray();
    } catch (IOException e) {
        throw new RuntimeException("Can't serialize Avro object", e);
    }
}

public GenericRecord deserialize(byte[] bytes) {
    try {
        return avroReader.read(null, avroDecoderFactory.binaryDecoder(bytes, null));
    } catch (IOException e) {
        throw new RuntimeException("Can't deserialize Avro object", e);
    }
 }
}

但是,在测试我的发布者类时,我遇到了以下错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.sample.data.generated.avro.SampleDate to class com.sample.message.serialize.SampleDateSerializer specified in value.serializer

调试代码,发现

GenericDatumWriter.write()...

方法在调用时返回 null

Conversion conversion = this.getData().getConversionByClass(datum.getClass(), logicalType);

调用
    org.apache.avro.generic.GenericData

    public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType) {
    Map conversions = (Map)this.conversionsByClass.get(datumClass);
    return conversions != null?(Conversion)conversions.get(logicalType.getName()):null;
}

在这方面,有没有办法让我填充

 GenericData.conversionsByClass 

映射,以便它可以返回正确的转换器以用于给定的

 date logicalType?

【问题讨论】:

    标签: serialization apache-kafka avro kafka-producer-api


    【解决方案1】:

    我已经通过在我的 GenericDatumWriter 中传递 GenericData 对象来解决它。

    我的通用序列化器现在看起来像这样:

        public AvroGenericSerializer(Schema schema) {
        avroEncoderFactory = EncoderFactory.get();
        avroDecoderFactory = DecoderFactory.get();
        final GenericData genericData = new GenericData();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        avroWriter = new GenericDatumWriter<>(schema, genericData);
        avroReader = new GenericDatumReader<>(schema);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-03-11
      • 1970-01-01
      • 2018-04-08
      • 1970-01-01
      • 2012-07-21
      • 1970-01-01
      • 2015-12-09
      相关资源
      最近更新 更多