【问题标题】:Flink SQL (V 1.12.1) unable to read debezium changelog from Kinesis streamFlink SQL (V 1.12.1) 无法从 Kinesis 流中读取 debezium 更新日志
【发布时间】:2021-05-25 02:52:47
【问题描述】:

我在从 Kinesis 流中读取 Debezium 更改日志时遇到了一些问题。我能否深入了解如何使用 Flink SQL 解析更改日志事件。

以下是我尝试通过 Flink SQL 客户端解析流的尝试

Flink SQL> CREATE TABLE test_table (
>   city_id INT,
>   country_id INT,
>   city STRING,
>   last_update timestamp
> )
> WITH (
>   'connector' = 'kinesis',
>   'stream' = 'kinesis.sakila.city',
>   'aws.region' = 'us-east-1',
>   'scan.stream.initpos' = 'TRIM_HORIZON',
>   'format' = 'debezium-json'
> );
[INFO] Table has been created.

Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema

【问题讨论】:

    标签: apache-flink debezium flink-sql


    【解决方案1】:

    Flink 文档中有一个表格显示which connectors support each of the formats。您会看到 Kinesis 连接器不支持 Debezium 更改日志格式。

    【讨论】:

    • 非常感谢大卫。我可能应该深入研究文档。您知道是否有计划在近期内支持 Kinesis 的变更日志格式?
    • 我不知道计划是什么,也不知道涉及什么。您可以在用户邮件列表中询问。
    • 您可以在issues.apache.org/jira/browse/FLINK-20060 中提出您的意见,以获得更高的优先级。
    猜你喜欢
    • 1970-01-01
    • 2020-12-29
    • 2017-04-16
    • 1970-01-01
    • 2022-07-16
    • 1970-01-01
    • 2019-09-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多