【问题标题】:Flink table sink doesn't work with debezium-avro-confluent sourceFlink 表接收器不适用于 debezium-avro-confluent 源
【发布时间】:2021-05-31 15:14:24
【问题描述】:

我正在使用 Flink SQL 从 Kafka 读取 debezium avro 数据,并将其作为 parquet 文件存储在 S3 中。这是我的代码,

import os

from pyflink.datastream import StreamExecutionEnvironment, FsStateBackend
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment, \
    ScalarFunction

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 12 s
exec_env.enable_checkpointing(12000)
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE = 'source'
KAFKA_TOPIC = os.environ['KAFKA_TOPIC']
KAFKA_BOOTSTRAP_SERVER = os.environ['KAFKA_BOOTSTRAP_SERVER']
OUTPUT_TABLE = 'sink'
S3_BUCKET = os.environ['S3_BUCKET']
OUTPUT_S3_LOCATION = os.environ['OUTPUT_S3_LOCATION']

ddl_source = f"""
       CREATE TABLE {INPUT_TABLE} (
           `event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
           `id` BIGINT,
           `price` DOUBLE,
           `type` INT,
           `is_reinvite` INT
       ) WITH (
           'connector' = 'kafka',
           'topic' = '{KAFKA_TOPIC}',
           'properties.bootstrap.servers' = '{KAFKA_BOOTSTRAP_SERVER}',
           'scan.startup.mode' = 'earliest-offset',
           'format' = 'debezium-avro-confluent',
           'debezium-avro-confluent.schema-registry.url' = 'http://kafka-production-schema-registry:8081'
       )
   """

ddl_sink = f"""
       CREATE TABLE {OUTPUT_TABLE} (
           `event_time` TIMESTAMP,
           `id` BIGINT,
           `price` DOUBLE,
           `type` INT,
           `is_reinvite` INT
       ) WITH (
            'connector' = 'filesystem',
            'path' = 's3://{S3_BUCKET}/{OUTPUT_S3_LOCATION}',
            'format' = 'parquet'
       )
   """

t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT * 
    FROM {INPUT_TABLE}
""")

当我提交作业时,我收到以下错误消息,

pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, price, type, is_reinvite, timestamp])

我正在使用 Flink 1.12.1。源工作正常,我已经使用接收器中的“打印”连接器对其进行了测试。这是在表接收器中使用“打印”连接器时从任务管理器日志中提取的示例数据集,

-D(2021-02-20T17:07:27.298,14091764,26.0,9,0)
-D(2021-02-20T17:07:27.298,14099765,26.0,9,0)
-D(2021-02-20T17:07:27.299,14189806,16.0,9,0)
-D(2021-02-20T17:07:27.299,14189838,37.0,9,0)
-D(2021-02-20T17:07:27.299,14089840,26.0,9,0)
-D(2021-02-20T17:07:27.299,14089847,26.0,9,0)
-D(2021-02-20T17:07:27.300,14189859,26.0,9,0)
-D(2021-02-20T17:07:27.301,14091808,37.0,9,0)
-D(2021-02-20T17:07:27.301,14089911,37.0,9,0)
-D(2021-02-20T17:07:27.301,14099937,26.0,9,0)
-D(2021-02-20T17:07:27.302,14091851,37.0,9,0)

如何使我的表接收器与文件系统连接器一起工作?

【问题讨论】:

    标签: apache-kafka apache-flink avro parquet debezium


    【解决方案1】:

    会发生什么:

    • 当接收到 Debezium 记录时,Flink 通过根据主键添加、删除和抑制 Flink 行来更新逻辑表。
    • 唯一可以处理这类信息的接收器是那些具有按键更新概念的接收器。 Jdbc 就是一个典型的例子,在这种情况下,Flink 可以直接将“带有键 foo 的 Flink 行已更新为 bar”的概念转换为“带有键 foo 的 JDBC 行应该使用值 bar 更新”之类的。 filesystem sink 不支持这种操作,因为文件只能追加。

    另见Flink documentation on append and update queries

    在实践中,为了进行转换,我们首先必须确定我们想要在这个仅附加文件中拥有什么。

    如果我们想要在任何时候更新 id 时在文件中包含每个项目的最新版本,那么据我所知,可以先将其转换为流,然后使用FileSink。请注意,在这种情况下,结果包含一个布尔值,表示该行是更新还是删除,我们必须决定如何在结果文件中显示此信息。

    注意:我使用了另一个 CDC example from the Flink SQL cookbook 来重现类似的设置:

    
    // assuming a Flink retract table of claims build from a CDC stream:
    tableEnv.executeSql("" +
            " CREATE TABLE accident_claims (\n" +
            "    claim_id INT,\n" +
            "    claim_total FLOAT,\n" +
            "    claim_total_receipt VARCHAR(50),\n" +
            "    claim_currency VARCHAR(3),\n" +
            "    member_id INT,\n" +
            "    accident_date VARCHAR(20),\n" +
            "    accident_type VARCHAR(20),\n" +
            "    accident_detail VARCHAR(20),\n" +
            "    claim_date VARCHAR(20),\n" +
            "    claim_status VARCHAR(10),\n" +
            "    ts_created VARCHAR(20),\n" +
            "    ts_updated VARCHAR(20)" +
            ") WITH (\n" +
            "  'connector' = 'postgres-cdc',\n" +
            "  'hostname' = 'localhost',\n" +
            "  'port' = '5432',\n" +
            "  'username' = 'postgres',\n" +
            "  'password' = 'postgres',\n" +
            "  'database-name' = 'postgres',\n" +
            "  'schema-name' = 'claims',\n" +
            "  'table-name' = 'accident_claims'\n" +
            " )"
    );
    
    // convert it to a stream
    Table accidentClaims = tableEnv.from("accident_claims");
    DataStream<Tuple2<Boolean, Row>> accidentClaimsStream = tableEnv
        .toRetractStream(accidentClaims, Row.class);
    
    // and write to file
    final FileSink<Tuple2<Boolean, Row>> sink = FileSink
        // TODO: adapt the output format here:
        .forRowFormat(new Path("/tmp/flink-demo"),
                            (Encoder<Tuple2<Boolean, Row>>) (element, stream) -> stream.write((element.toString() + "\n").getBytes(StandardCharsets.UTF_8)))
        .build();
    ordersStreams.sinkTo(sink);
    
    streamEnv.execute();
    

    请注意,在转换过程中,您会获得一个布尔值,告诉您该行是该事故索赔的新值,还是该索赔的删除。我的基本FileSink 配置只是在输出中包含该布尔值,但如何处理删除将视具体情况而定。

    文件中的结果如下所示:

    head /tmp/flink-demo/2021-03-09--09/.part-c7cdb74e-893c-4b0e-8f69-1e8f02505199-0.inprogress.f0f7263e-ec24-4474-b953-4d8ef4641998
    
    (true,1,4153.92,null,AUD,412,2020-06-18 18:49:19,Permanent Injury,Saltwater Crocodile,2020-06-06 03:42:25,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
    (true,2,8940.53,IpsumPrimis.tiff,AUD,323,2019-03-18 15:48:16,Collision,Blue Ringed Octopus,2020-05-26 14:59:19,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
    (true,3,9406.86,null,USD,39,2019-04-28 21:15:09,Death,Great White Shark,2020-03-06 11:20:54,INITIAL,2021-03-09 06:39:28,2021-03-09 06:39:28)
    (true,4,3997.9,null,AUD,315,2019-10-26 21:24:04,Permanent Injury,Saltwater Crocodile,2020-06-25 20:43:32,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
    (true,5,2647.35,null,AUD,74,2019-12-07 04:21:37,Light Injury,Cassowary,2020-07-30 10:28:53,REIMBURSED,2021-03-09 06:39:28,2021-03-09 06:39:28)
    

    【讨论】:

    • Svend,非常感谢您用您的超级有用的回答为我指明了正确的方向。
    • 嗨 Svend,我试图澄清您建议的第一个解决方案中的几件事。我在代码中添加了一个 tumble Window 查询并得到以下错误,“pyflink.util.exceptions.TableException:窗口聚合只能在时间属性列上定义,但遇到 TIMESTAMP(3)。”
    • 然后我在查询中包含水印字段并得到以下错误,“pyflink.util.exceptions.TableException:GroupWindowAggregate 不支持使用节点 TableSourceScan 产生的更新和删除更改(table= [[default_catalog, default_database, source, watermark=[CAST($4):TIMESTAMP(3)]]], fields=[id, price, type, is_reinvite, timestamp])"
    • 嗨,维杜拉。我认为我的回答有问题,我可能对由 Debezium 数据形成的表格的确切形状有误。你能在没有 TUMBLE 的情况下尝试相同的 SELECT 语句吗?如果您可以在某处上传一些 Debezium 测试数据,我很乐意在本周末运行一些测试并尝试解决这个问题。
    • 嗨@ViduraMudalige,我更新了我的答案。我实际上并没有设法使它与 SQL 语句一起工作。也许这是可行的,我只是不知道怎么做,对不起。在更新的答案中,我们获取所有新值的流并将其写入文件。
    猜你喜欢
    • 2021-05-11
    • 1970-01-01
    • 2020-01-01
    • 2021-11-30
    • 2020-08-28
    • 2021-10-23
    • 2018-10-01
    • 2019-08-08
    • 2022-08-02
    相关资源
    最近更新 更多