【发布时间】:2019-07-06 12:53:57
【问题描述】:
我有一个工具,它使用org.apache.parquet.hadoop.ParquetWriter 将 CSV 数据文件转换为 parquet 数据文件。
目前,它只处理int32、double和string
我需要支持 parquet timestamp 逻辑类型(注释为 int96),我不知道该怎么做,因为我在网上找不到精确的规范。
这种时间戳编码 (int96) 似乎很少见,并且没有得到很好的支持。我在网上找到的规格细节很少。 This github README 声明:
保存为 int96 的时间戳由一天中的纳秒组成 (前 8 个字节)和儒略日(后 4 个字节)。
具体来说:
-
MessageType 架构中的列使用哪个镶木地板 Type?我假设我应该使用原始类型
PrimitiveTypeName.INT96,但我不确定是否有办法指定逻辑类型? - 如何写入数据?即我以什么格式将时间戳写入组?对于 INT96 时间戳,我假设我必须编写一些二进制类型?
这是我的代码的简化版本,它演示了我正在尝试做的事情。具体来说,看一下“TODO”cmets,这是代码中与上述问题相关的两点。
List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));
// TODO:
// Specify the TIMESTAMP type.
// How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null));
MessageType schema = new MessageType("input", fields);
// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path("output.parquet"),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
1048576,
true,
false,
ParquetProperties.WriterVersion.PARQUET_1_0,
configuration
);
// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
rowNum ++;
Group group = f.newGroup();
colIndex = 0;
for (String record : csvRecord) {
if (record == null || record.isEmpty() || record.equals( "NULL")) {
colIndex++;
continue;
}
record = record.trim();
String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);
switch (colIndex) {
case 0: // int32
group.add(colIndex, Integer.parseInt(record));
break;
case 1: // double
group.add(colIndex, Double.parseDouble(record));
break;
case 2: // string
group.add(colIndex, record);
break;
case 3:
// TODO: convert CSV string value to TIMESTAMP type (how?)
throw new NotImplementedException();
}
}
writer.write(group);
}
writer.close();
【问题讨论】:
-
仅供参考,从我在 this issue ticket 中读到的内容看来,Parquet 中的
INT96支持已被弃用。 -
@BasilBourque 是的,我看到了。不幸的是,parquet 文件的使用者正在强制执行这种 96 位时间戳编码,所以我需要弄清楚如何编写这种类型。
-
我对 Parquet 或 Hadoop 一无所知,因此无法发布答案。但一些提示可能会有所帮助:Java 原语仅限于 64 位数字,因此使用
BigInteger类来管理 96 位数字。Instant类和其他 java.time 类具有纳秒级分辨率。但是它们通过跟踪一对数字在内部工作:自纪元 1970-01-01T00:00:00Z 以来的整秒数加上小数秒的纳秒数。因此,您必须做一些数学运算才能将经过的总纳秒数输入一对数字。见Instant.ofEpochSecond&.plusNanos。
标签: java apache-spark hadoop parquet