【问题标题】:Flink json serialization timezone issueFlink json序列化时区问题
【发布时间】:2020-11-19 20:15:05
【问题描述】:

我使用JsonRowSerializationSchema 将 Flink 的 Row 序列化为 JSON。 I SQL 时间戳序列化存在时区问题。

  val row = new Row(1)
  row.setField(0, new Timestamp(0))

  val tableSchema = TableSchema
    .builder
    .field("c", DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp]))
    .build

  val serializer = JsonRowSerializationSchema.builder()
    .withTypeInfo(tableSchema.toRowType)
    .build()

  println(new String(serializer.serialize(row)))

{"c":"1969-12-31T16:00:00Z"}

我看到它使用PST(本地时区)来解释时间戳,然后输出UTC(请参阅输出中的Z

如果我执行TimeZone.setDefault(TimeZone.getTimeZone("UTC")),那么它会打印{"c":"1970-01-01T00:00:00Z"}。我的时间戳是为 UTC 时间创建的,我希望 Flink 将它们解释为 UTC。

我正在检查 Flink 的实现,以下两种方法在起作用。

    private JsonNode convertLocalDateTime(ObjectMapper mapper, JsonNode reuse, Object object) {
        return mapper.getNodeFactory()
            .textNode(RFC3339_TIMESTAMP_FORMAT.format((LocalDateTime) object));
    }

    private JsonNode convertTimestamp(ObjectMapper mapper, JsonNode reuse, Object object) {
        Timestamp timestamp = (Timestamp) object;
        return convertLocalDateTime(mapper, reuse, timestamp.toLocalDateTime());
    }

看起来实现是硬编码的,有没有办法告诉Flink使用UTC而不改变系统时间?

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    java.sql.Timestamp 非常有问题,因为它取决于时区。这就是为什么我们在新的 Table/SQL 类型系统中将其替换为新的 java.time.* 类。

    对于过时的实现,我们建议所有 Flink JVM 都配置在 UTC 时区。

    对于表/SQL,我们使用新的org.apache.flink.formats.json.JsonRowDataSerializationSchema,但这适用于内部数据结构。我建议只复制JsonRowSerializationSchema 的源代码并根据需要实现格式。或者直接使用 Jackson 库,这将完全避免处理 TypeInformation

    【讨论】:

      猜你喜欢
      • 2020-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多