【问题标题】:Avro logicalType String Date conversion to EPOCH timestamp-milisAvrologicalType String 日期转换为 EPOCH 时间戳-milis
【发布时间】:2020-10-01 08:04:43
【问题描述】:

我有以下架构

     {"name": "timestampstring", "type": [{"type":"string","logicalType":"timestamp-millis"}, "null"]},

我打算向它提供日期,并让转换将日期转换为纪元。

    GenericRecord user2 = new GenericData.Record(schema1);
    user2.put("timestampstring", "2019-01-26T12:00:40.931");

    final GenericData genericData = new GenericData();
    genericData.addLogicalTypeConversion(new MyTimestampConversion());
    datumReader = new GenericDatumReader<GenericRecord>(schema2, schema2, genericData);

    GenericRecord user = null;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
        while (dataFileReader.hasNext()) {
            user = dataFileReader.next(user);

            System.out.println(user);
        }
    }

//转换码

public static class MyTimestampConversion extends Conversion<Long> {
    public MyTimestampConversion() {
    }

    public Class<Long> getConvertedType() {
        return Long.class;
    }

    public String getLogicalTypeName() {
        return "timestamp-millis";
    }

    public Long fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
        return 123L;
    }
}

但是这段代码不起作用......我期待它转换为时间戳 milis(我在上面的示例中硬编码 123L)。

有什么帮助吗?

【问题讨论】:

    标签: java avro


    【解决方案1】:

    参考How to define a LogicalType in Avro. (java),我设法通过创建自己的逻辑类型来解决这个问题。似乎用“timestamp-millis”logicalType 这样做是行不通的。所以我创建了自己的logicalType...

    package example;
    
    import org.apache.avro.Conversion;
    import org.apache.avro.LogicalType;
    import org.apache.avro.LogicalTypes;
    import org.apache.avro.Schema;
    import org.apache.avro.Schema.Parser;
    import org.apache.avro.file.DataFileReader;
    import org.apache.avro.file.DataFileWriter;
    import org.apache.avro.generic.*;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.DatumWriter;
    import org.apache.avro.io.ResolvingDecoder;
    import org.joda.time.DateTime;
    
    import java.io.File;
    import java.io.IOException;
    
    public class AvroWriteDateUtcToEpochMili {
        public static void main(String[] args) throws IOException {
    
            Boolean isRegisterNewLogicalType = true;
            Boolean isWrite = true;
    
            if(isRegisterNewLogicalType) {
                LogicalTypes.register(UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
                    private final LogicalType convertLongLogicalType = new UtcDateTimeToTimestampMilisLogicalType();
    
                    @Override
                    public LogicalType fromSchema(Schema schema) {
                        return convertLongLogicalType;
                    }
                });
            }
    
            Schema schema1 = new Parser().parse(new File("./userdate_modified_string.avsc"));
    
            // Serialize user1 and user2 to disk
            File file1 = new File("users.avro");
            if(isWrite) {
                GenericRecord user1 = new GenericData.Record(schema1);
                user1.put("timestamplong", "2019-07-09T04:31:45.281Z");
                //user1.put("timestamplong", 1L);
                user1.put("timestampstring", "2019-07-09T04:31:45.281Z");
    
                GenericRecord user2 = new GenericData.Record(schema1);
                //user2.put("timestamplong", "2018-07-09T04:30:45.781Z");
                user2.put("timestamplong", 2L);
                user2.put("timestampstring", (new DateTime(2L)).toString());
                //user2.put("timestampstring", new Timestamp(new Date("2018-01-26").getTime()));
    
                var currentDateTime = DateTime.now();
    
                GenericRecord user3 = new GenericData.Record(schema1);
                user3.put("timestamplong", currentDateTime.toString());
                //user3.put("timestamplong", 3L);
                user3.put("timestampstring", currentDateTime.toString());
    
                final GenericData genericData2 = new GenericData();
                genericData2.addLogicalTypeConversion(new MyStringTimestampConversion());
    
                DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema1, genericData2);
                DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
                dataFileWriter.create(schema1, file1);
                dataFileWriter.append(user1);
                dataFileWriter.append(user2);
                dataFileWriter.append(user3);
                dataFileWriter.close();
            }
    
            // Deserialize users from disk
    
            Boolean once = true;
            DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema1);
            GenericRecord user = null;
            try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
                while (dataFileReader.hasNext()) {
                    user = dataFileReader.next(user);
    
                    if(once) {
                        System.out.println(user.getSchema());
                        once = false;
                    }
    
                    //System.out.println(LogicalTypes.fromSchema(user.getSchema()));
                    System.out.println(user);
                }
            }
    
            // Deserialize users from disk
            System.out.println("//AFTER");
    
            Schema schema2 = new Parser().parse(new File("./userdate_modified_string.avsc"));
            final GenericData genericData = new GenericData();
            genericData.addLogicalTypeConversion(new MyStringTimestampConversion());
            datumReader = new MyReader<GenericRecord>(schema2, schema2, genericData);
    
            user = null;
            try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file1, datumReader)) {
                while (dataFileReader.hasNext()) {
                    user = dataFileReader.next(user);
                    System.out.println(user);
                }
            }
        }
    
        public static class MyReader<G extends IndexedRecord> extends GenericDatumReader {
            public MyReader() {
                super();
            }
    
            public MyReader(Schema writer, Schema reader, GenericData data) {
                super(writer, reader, data);
            }
    
            @Override
            protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
                Object datum = this.readWithoutConversion(old, expected, in);
                LogicalType logicalType = expected.getLogicalType();
                if (logicalType != null) {
                    Conversion<?> conversion = this.getData().getConversionFor(logicalType);
                    if (conversion != null) {
                        return this.convert(datum, expected, logicalType, conversion);
                    }
                }
    
                return datum;
            }
        }
    
        public static class MyStringTimestampConversion extends Conversion<String> {
            public MyStringTimestampConversion() {
                super();
            }
    
            @Override
            public Class<String> getConvertedType() {
                return String.class;
            }
    
            @Override
            public String getLogicalTypeName() {
                // "timestamp-millis";
                return UtcDateTimeToTimestampMilisLogicalType.CONVERT_LONG_TYPE_NAME;
            }
    
            @Override
            public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
                return (new DateTime(millisFromEpoch)).toString();
                //return "123456L";
            }
    
            @Override
            public Long toLong(String value, Schema schema, LogicalType type) { //https://stackoverflow.com/questions/22681348/joda-datetime-to-unix-datetime
                //DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSS'Z'");//https://stackoverflow.com/questions/8405087/what-is-this-date-format-2011-08-12t201746-384z
                DateTime dateTime = DateTime.parse(value);
                long epochMilli = dateTime.toDate().toInstant().toEpochMilli();
                return epochMilli;
            }
        }
    }
    

    逻辑类型

    public class UtcDateTimeToTimestampMilisLogicalType extends LogicalType {
        //The key to use as a reference to the type
        public static final String CONVERT_LONG_TYPE_NAME = "utc-to-epoch-millis";
    
        public UtcDateTimeToTimestampMilisLogicalType() {
            super(CONVERT_LONG_TYPE_NAME);
        }
    
    
        @Override
        public void validate(Schema schema) {
            super.validate(schema);
            if (schema.getType() != Schema.Type.LONG) {
                throw new IllegalArgumentException(
                        "Logical type 'utc-to-epoch-millis' must be backed by bytes");
            }
        }
    }
    

    架构

    {
        "namespace": "example.avro.modified.string",
        "type": "record",
        "name": "UserDate",
        "fields": [
            {
                "name": "timestamplong",
                "type": 
                    {
                        "type": "long",
                        "logicalType": "utc-to-epoch-millis"
                    }
            },
            {
                "name": "timestampstring",
                "type": "string"
            }
        ]
    }
    

    结果

    {"type":"record","name":"UserDate","namespace":"example.avro.modified.string","fields":[{"name":"timestamplong","type":{"type":"long","logicalType":"utc-to-epoch-millis"}},{"name":"timestampstring","type":"string"}]}
    {"timestamplong": 1562646705281, "timestampstring": "2019-07-09T04:31:45.281Z"}
    {"timestamplong": 2, "timestampstring": "1970-01-01T07:30:00.002+07:30"}
    {"timestamplong": 1601616694713, "timestampstring": "2020-10-02T13:31:34.713+08:00"}
    //AFTER
    {"timestamplong": "2019-07-09T12:31:45.281+08:00", "timestampstring": "2019-07-09T04:31:45.281Z"}
    {"timestamplong": "1970-01-01T07:30:00.002+07:30", "timestampstring": "1970-01-01T07:30:00.002+07:30"}
    {"timestamplong": "2020-10-02T13:31:34.713+08:00", "timestampstring": "2020-10-02T13:31:34.713+08:00"}
    
    Process finished with exit code 0
    

    【讨论】:

      猜你喜欢
      • 2017-03-23
      • 2015-08-19
      • 2017-09-01
      • 2012-04-01
      • 2017-06-03
      • 2011-10-09
      • 2020-06-15
      • 2019-12-24
      • 2016-11-26
      相关资源
      最近更新 更多