【发布时间】:2017-03-28 04:04:57
【问题描述】:
我有一个接受 GenericRecord 类的发布者。
@Override
public Future<RecordMetadata> publish(GenericRecord genericRecord) {
Future<RecordMetadata> recordMetadataFuture =
getPublisher().send(new ProducerRecord<>(producerConfiguration.getProperties()
.getProperty(ProducerConfiguration.PROPERTY_NAME_TOPIC), "sample.key",genericRecord));
return recordMetadataFuture;
}
private KafkaProducer<String, GenericRecord> getPublisher() {
return new KafkaProducer<>(producerConfiguration.getProperties());
}
我有以下 avro 架构:
{
"type" : "record",
"name" : "SampleDate",
"namespace": "com.sample.data.generated.avro",
"doc" : "sample date",
"fields" : [
{
"name" : "sampleDate",
"type" : {
"type" : "int",
"logicalType" : "date"
}
}
]
}
我已经构建了自己的序列化器:
日期序列化器:
@Component
public class SampleDateSerializer implements Serializer<GenericRecord> {
private AvroGenericSerializer serializer;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer = new AvroGenericSerializer(SampleDate.SCHEMA$);
}
@Override
public byte[] serialize(String topic, GenericRecord data) {
return serializer.serialize(data);
}
@Override
public void close() {
}
通用序列化器:
public class AvroGenericSerializer {
private EncoderFactory avroEncoderFactory;
private DecoderFactory avroDecoderFactory;
private GenericDatumWriter<GenericRecord> avroWriter;
private GenericDatumReader<GenericRecord> avroReader;
public AvroGenericSerializer(Schema schema) {
avroEncoderFactory = EncoderFactory.get();
avroDecoderFactory = DecoderFactory.get();
avroWriter = new GenericDatumWriter<>(schema);
avroReader = new GenericDatumReader<>(schema);
}
public byte[] serialize(GenericRecord data) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
final BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);
try {
avroWriter.write(data, binaryEncoder);
binaryEncoder.flush();
stream.close();
return stream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Can't serialize Avro object", e);
}
}
public GenericRecord deserialize(byte[] bytes) {
try {
return avroReader.read(null, avroDecoderFactory.binaryDecoder(bytes, null));
} catch (IOException e) {
throw new RuntimeException("Can't deserialize Avro object", e);
}
}
}
但是,在测试我的发布者类时,我遇到了以下错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.sample.data.generated.avro.SampleDate to class com.sample.message.serialize.SampleDateSerializer specified in value.serializer
调试代码,发现
GenericDatumWriter.write()...
方法在调用时返回 null
Conversion conversion = this.getData().getConversionByClass(datum.getClass(), logicalType);
从
调用 org.apache.avro.generic.GenericData
public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType) {
Map conversions = (Map)this.conversionsByClass.get(datumClass);
return conversions != null?(Conversion)conversions.get(logicalType.getName()):null;
}
在这方面,有没有办法让我填充
GenericData.conversionsByClass
映射,以便它可以返回正确的转换器以用于给定的
date logicalType?
【问题讨论】:
标签: serialization apache-kafka avro kafka-producer-api