【问题标题】:How to write TIMESTAMP logical type (INT96) to parquet, using ParquetWriter?如何使用 ParquetWriter 将 TIMESTAMP 逻辑类型(INT96)写入镶木地板?
【发布时间】:2019-07-06 12:53:57
【问题描述】:

我有一个工具,它使用org.apache.parquet.hadoop.ParquetWriter 将 CSV 数据文件转换为 parquet 数据文件。

目前,它只处理int32doublestring

我需要支持 parquet timestamp 逻辑类型(注释为 int96),我不知道该怎么做,因为我在网上找不到精确的规范。

这种时间戳编码 (int96) 似乎很少见,并且没有得到很好的支持。我在网上找到的规格细节很少。 This github README 声明:

保存为 int96 的时间戳由一天中的纳秒组成 (前 8 个字节)和儒略日(后 4 个字节)。

具体来说:

  1. MessageType 架构中的列使用哪个镶木地板 Type?我假设我应该使用原始类型PrimitiveTypeName.INT96,但我不确定是否有办法指定逻辑类型?
  2. 如何写入数据?即我以什么格式将时间戳写入组?对于 INT96 时间戳,我假设我必须编写一些二进制类型?

这是我的代码的简化版本,它演示了我正在尝试做的事情。具体来说,看一下“TODO”cmets,这是代码中与上述问题相关的两点。

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();

【问题讨论】:

  • 仅供参考,从我在 this issue ticket 中读到的内容看来,Parquet 中的 INT96 支持已被弃用。
  • @BasilBourque 是的,我看到了。不幸的是,parquet 文件的使用者正在强制执行这种 96 位时间戳编码,所以我需要弄清楚如何编写这种类型。
  • 我对 Parquet 或 Hadoop 一无所知,因此无法发布答案。但一些提示可能会有所帮助:Java 原语仅限于 64 位数字,因此使用 BigInteger 类来管理 96 位数字。 Instant 类和其他 java.time 类具有纳秒级分辨率。但是它们通过跟踪一对数字在内部工作:自纪元 1970-01-01T00:00:00Z 以来的整秒数加上小数秒的纳秒数。因此,您必须做一些数学运算才能将经过的总纳秒数输入一对数字。见Instant.ofEpochSecond & .plusNanos

标签: java apache-spark hadoop parquet


【解决方案1】:
  1. INT96 时间戳使用 INT96 物理类型,没有任何逻辑类型,所以不要用任何注释。
  2. 如果您对 INT96 时间戳的结构感兴趣,请查看here。如果您想查看与此格式相互转换的示例代码,请查看this file from Hive

【讨论】:

  • 这很好,我将测试这段代码,看看它是否有效
  • 我还发现了这个:github.com/apache/spark/blob/… -- 用于编码 int96 时间戳的 scala 代码
  • 没有人不应该使用 INT96 编写时间戳。有记录和支持的时间戳类型。改用这些。
  • @blue OP 知道这一点。他写道:“不幸的是,parquet 文件的使用者正在强制执行这种 96 位时间戳编码,所以我需要弄清楚如何编写这种类型。”
  • 我遇到了另一个类似的问题,但要写入 null 值。你有机会看看我的新问题吗? stackoverflow.com/questions/55247724/…
【解决方案2】:

我想通了,使用 spark sql 中的this code 作为参考。

INT96 二进制编码分为两部分: 前 8 个字节是自午夜以来的纳秒 最后4个字节是Julian day

String value = "2019-02-13 13:35:05";

final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);

// Parse date
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(parser.parse(value));

// Calculate Julian days and nanoseconds in the day
LocalDate dt = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH));
int julianDays = (int) JulianFields.JULIAN_DAY.getFrom(dt);
long nanos = (cal.get(Calendar.HOUR_OF_DAY) * NANOS_PER_HOUR)
        + (cal.get(Calendar.MINUTE) * NANOS_PER_MINUTE)
        + (cal.get(Calendar.SECOND) * NANOS_PER_SECOND);

// Write INT96 timestamp
byte[] timestampBuffer = new byte[12];
ByteBuffer buf = ByteBuffer.wrap(timestampBuffer);
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(julianDays);

// This is the properly encoded INT96 timestamp
Binary tsValue = Binary.fromReusedByteArray(timestampBuffer);

【讨论】:

  • 如果你有 julianDays 和 nanos 而不是打包自己的二进制文件,则可以使用 NanoTime。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-08-10
  • 2018-11-13
  • 2021-03-15
  • 2021-08-27
  • 2021-10-28
  • 1970-01-01
  • 2014-12-13
相关资源
最近更新 更多