【问题标题】:How to update table schema when there is new Avro schema for Kafka data in Flink?当 Flink 中的 Kafka 数据有新的 Avro 模式时,如何更新表模式?
【发布时间】:2021-02-26 03:26:55
【问题描述】:

我们正在使用 Flink Table API 在 Flink 应用程序中消费一个 Kafka 主题。

当我们第一次提交应用程序时,我们首先从我们的自定义注册表中读取最新的架构。然后使用 Avro 模式创建一个 Kafka 数据流和表。我的数据序列化程序的实现类似于 Confluent 模式注册表,通过检查模式 ID 然后使用注册表。所以我们可以在运行时应用正确的模式。

但是,我不知道如何在不重新部署作业的情况下更新表架构并重新执行 SQL。有没有办法让后台线程检查架构更改,如果有,暂停当前执行,更新表架构并执行 SQL。

这对于向应用程序持续交付架构更改特别有用。我们已经进行了兼容性检查。

【问题讨论】:

    标签: apache-kafka apache-flink flink-sql


    【解决方案1】:

    TL;DR 在大多数情况下,您无需进行任何更改即可使其正常工作。

    在 Avro 中,存在读取器和写入器模式的概念。 Writer schema 是用于生成 Avro 记录的 schema,它被编码到有效负载中(在大多数情况下作为 id)。

    您的应用程序使用读取器架构来理解您的数据。如果您进行特定计算,则您正在使用 Avro 记录的一组特定字段。

    现在好的部分:如果它们与模式兼容,Avro 会透明地将写入模式转换为读取模式。因此,只要您的架构完全兼容,就有一种方法可以始终将编写器架构转换为您的读取架构。

    因此,如果您的记录架构在应用程序运行时在后台发生更改,DeserializationSchema 会获取新的写入架构并推断出到读取架构的新映射。您的查询不会发现任何变化。


    如果您真的想丰富应用程序中的模式,这种方法就不够用了;例如,您总是想添加一个字段calculated 并返回所有其他字段。然后不会选择新添加的字段,因为您的阅读器架构实际上发生了变化。在这种情况下,您需要重新启动或使用通用记录架构。

    【讨论】:

    • 我相信您对DataStream 的解释是正确的。 DeserializationSchema 将获取新模式以反序列化事件。但是,我的问题是针对Table API。当我们创建 Table API 时,它需要一个静态模式。我相信记录会被DeserializationSchema反序列化,但是新的字段对于提交SQL的用户是不可见的。我想,SELECT newfield from MyTable 会失败。
    • Table 也使用DeserializationSchema 并且一切都对Table API 有效。但是,在您的示例中,您实际上已经更改了阅读器架构。这只有在您重新注册表以更新阅读器架构时才有可能。
    猜你喜欢
    • 1970-01-01
    • 2018-04-17
    • 2018-12-16
    • 2019-08-19
    • 2017-03-14
    • 1970-01-01
    • 2020-08-31
    • 1970-01-01
    • 2016-07-18
    相关资源
    最近更新 更多