【问题标题】:Generic conversion from POJO to Avro Record从 POJO 到 Avro Record 的通用转换
【发布时间】:2015-09-21 08:27:05
【问题描述】:

我正在寻找一种以通用方式将 POJO 转换为 avro 对象的方法。实现应该对 POJO 类的任何更改都是健壮的。我已经实现了,但明确填写了 avro 记录(参见下面的示例)。

有没有办法摆脱硬编码的字段名称,只从对象中填充 avro 记录?反射是唯一的方法,还是 avro 开箱即用地提供了这个功能?

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.reflect.ReflectData;

public class PojoToAvroExample {

    static class PojoParent {
        public final Map<String, String> aMap = new HashMap<String, String>();
        public final Map<String, Integer> anotherMap = new HashMap<String, Integer>();
    }

    static class Pojo extends PojoParent {
        public String uid;
        public Date eventTime;
    }

    static Pojo createPojo() {
        Pojo foo = new Pojo();
        foo.uid = "123";
        foo.eventTime = new Date();
        foo.aMap.put("key", "val");
        foo.anotherMap.put("key", 42);
        return foo;
    }

    public static void main(String[] args) {
        // extract the avro schema corresponding to Pojo class
        Schema schema = ReflectData.get().getSchema(Pojo.class);
        System.out.println("extracted avro schema: " + schema);
        // create avro record corresponding to schema
        Record avroRecord = new Record(schema);
        System.out.println("corresponding empty avro record: " + avroRecord);

        Pojo foo = createPojo();
        // TODO: to be replaced by generic variant:
        // something like avroRecord.importValuesFrom(foo);
        avroRecord.put("uid", foo.uid);
        avroRecord.put("eventTime", foo.eventTime);
        avroRecord.put("aMap", foo.aMap);
        avroRecord.put("anotherMap", foo.anotherMap);
        System.out.println("expected avro record: " + avroRecord);
    }
}

【问题讨论】:

标签: java avro


【解决方案1】:

你在用 Spring 吗?

我使用 Spring 功能为此构建了一个映射器。但也可以通过原始反射工具构建这样的映射器:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.util.Assert;

public class GenericRecordMapper {

    public static GenericData.Record mapObjectToRecord(Object object) {
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
        return record;
    }

    public static <T> T mapRecordToObject(GenericData.Record record, T object) {
        Assert.notNull(record, "record must not be null");
        Assert.notNull(object, "object must not be null");
        final Schema schema = ReflectData.get().getSchema(object.getClass());
        Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
        record.getSchema().getFields().forEach(d -> PropertyAccessorFactory.forDirectFieldAccess(object).setPropertyValue(d.name(), record.get(d.name()) == null ? record.get(d.name()) : record.get(d.name()).toString()));
        return object;
    }

}

使用此映射器,您可以生成一个可以轻松序列化为 avro 的 GenericData.Record。当您反序列化 Avro ByteArray 时,您可以使用它从反序列化记录重建 POJO:

序列化

byte[] serialized = avroSerializer.serialize("topic", GenericRecordMapper.mapObjectToRecord(yourPojo));

反序列化

GenericData.Record deserialized = (GenericData.Record) avroDeserializer.deserialize("topic", serialized);

YourPojo yourPojo = GenericRecordMapper.mapRecordToObject(deserialized, new YourPojo());

【讨论】:

  • 不错,但似乎不能正确处理列表、集合,例如List 被翻译成一个包含所有元素的字符串的列表,即列表的 toString
【解决方案2】:

这是通用的转换方式

public static <V> byte[] toBytesGeneric(final V v, final Class<V> cls) {
        final ByteArrayOutputStream bout = new ByteArrayOutputStream();
        final Schema schema = ReflectData.get().getSchema(cls);
        final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
        final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null);
        try {
            writer.write(v, binEncoder);
            binEncoder.flush();
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }


        return bout.toByteArray();
    }

public static void main(String[] args) {
    PojoClass pojoObject = new PojoClass();
    toBytesGeneric(pojoObject, PojoClass.class);
}

【讨论】:

    【解决方案3】:

    使用jackson/avro,很容易将pojo转换为byte[],类似于jackson/json:

    byte[] avroData = avroMapper.writer(schema).writeValueAsBytes(pojo);
    

    附言
    jackson 不仅处理 JSON,还处理 XML/Avro/Protobuf/YAML 等,具有非常相似的类和 API。

    【讨论】:

      【解决方案4】:

      除了我对@TranceMaster 的评论之外,下面的修改版本适用于原始类型和 Java 集:

      import org.apache.avro.Schema;
      import org.apache.avro.generic.GenericData;
      import org.apache.avro.reflect.ReflectData;
      import org.springframework.beans.PropertyAccessorFactory;
      import org.springframework.util.Assert;
      
      public class GenericRecordMapper {
      
          public static GenericData.Record mapObjectToRecord(Object object) {
              Assert.notNull(object, "object must not be null");
              final Schema schema = ReflectData.get().getSchema(object.getClass());
              System.out.println(schema);
              final GenericData.Record record = new GenericData.Record(schema);
              schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(object).getPropertyValue(r.name())));
              return record;
          }
      
          public static <T> T mapRecordToObject(GenericData.Record record, T object) {
              Assert.notNull(record, "record must not be null");
              Assert.notNull(object, "object must not be null");
      
              final Schema schema = ReflectData.get().getSchema(object.getClass());
              Assert.isTrue(schema.getFields().equals(record.getSchema().getFields()), "Schema fields didn't match");
      
              record
                      .getSchema()
                      .getFields()
                      .forEach(field ->
                          PropertyAccessorFactory
                                  .forDirectFieldAccess(object)
                                  .setPropertyValue(field.name(), record.get(field.name()))
                      );
              return object;
          }
      }
      

      【讨论】:

        【解决方案5】:

        我自己正是需要这样的东西。您需要的库位于 avro JAR 文件中,但奇怪的是,似乎没有办法从 avro-tools 命令行调用它。

        调用它为:java GenerateSchemaFromPOJO com.example.pojo.Person Person.java

        import java.io.FileWriter;
        import java.io.IOException;
        import java.io.Writer;
        
        import org.apache.avro.Schema;
        
        import com.fasterxml.jackson.databind.ObjectMapper;
        import com.fasterxml.jackson.dataformat.avro.AvroFactory;
        import com.fasterxml.jackson.dataformat.avro.AvroSchema;
        import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
        import com.fasterxml.jackson.dataformat.avro.schema.VisitorFormatWrapperImpl;
        
        public class GenerateSchemaFromPOJO {
        
            public static void main(String[] args) {
                String className  = null;
                String outputFile = null;
                Writer outputWriter = null;
                try {
                    if(args.length != 2) {
                        System.out.println("Usage: java " + GenerateSchemaFromPOJO.class.getCanonicalName() + " classname output-schema-file.json");
                        System.exit(1);
                    }
                    className = args[0];
                    outputFile = args[1];
        
                    Class<?> clazz = Class.forName(className);
        
                    AvroFactory avroFactory = new AvroFactory();
                    ObjectMapper mapper = new ObjectMapper(avroFactory);
        
                    AvroSchemaGenerator gen = new AvroSchemaGenerator();
                    mapper.acceptJsonFormatVisitor(clazz, gen);
                    AvroSchema schemaWrapper = gen.getGeneratedSchema();
        
                    Schema avroSchema = schemaWrapper.getAvroSchema();
                    String asJson = avroSchema.toString(true);
        
                    outputWriter = new FileWriter(outputFile);
                    outputWriter.write(asJson);
                } catch (Exception ex) {
                    System.err.println("caught " + ex);
                    ex.printStackTrace();
                    System.exit(1);
                } finally {
                    if(outputWriter != null) {
                        try {
                            outputWriter.close();
                        } catch (IOException e) {
                            System.err.println("Caught " + e + " while trying to close outputWriter to " + outputFile);;
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        

        【讨论】:

        • 我从您的回答中了解到,您的代码会为给定的clazz 生成 avro 架构。这不是我在问题中所要求的。我在ReflectData.get().getSchema(Pojo.class); 中做同样的事情。我正在寻找一种用通用变体替换 avroRecord.put(..., ...); 的方法
        猜你喜欢
        • 1970-01-01
        • 2017-04-20
        • 2016-11-03
        • 1970-01-01
        • 2021-10-17
        • 1970-01-01
        • 2017-09-08
        • 1970-01-01
        • 2020-11-26
        相关资源
        最近更新 更多