【问题标题】:Data type information lost when replication MySQL to PostgreSQL via Debezium通过 Debezium 将 MySQL 复制到 PostgreSQL 时数据类型信息丢失
【发布时间】:2022-01-15 08:03:33
【问题描述】:

我需要将 MySQL 数据库复制到 PostgreSQL 数据库。我选择了:

  • Debezium 连接
  • Avro 格式
  • 融合模式注册表
  • 卡夫卡

正在复制数据,但是,我丢失了一些架构信息。例如,mysql中datetime格式的列在Postgres中被复制为bigint,没有创建外键,也没有保留列的顺序(这很好)等等。

PostgreSQL 接收器连接器:

{
    "name": "jdbc-sink-dbt",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "2",
        "topics.regex": "test_(.*)",
        "connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
        "transforms": "unwrap,removePrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.removePrefix.regex": "test_([^.]+)",
        "transforms.removePrefix.replacement": "$1",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

MySQL 连接器:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "172.17.0.1",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.allowPublicKeyRetrieval": "true",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "test",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.test",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$2_$3",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

Debezium 连接配置:

KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081

【问题讨论】:

    标签: mysql postgresql confluent-platform confluent-schema-registry debezium


    【解决方案1】:

    例如mysql中一个datetime格式的列被复制为bigint

    这是由于源端的 Debezium 连接器使用默认的 time.precision.mode。如果您查看documentation,您会注意到默认精度将datetime 列作为INT64 发出,这解释了为什么接收器连接器将内容写入为bigint

    您现在可以在源端将time.precision.mode 设置为connect,以便JDBC 接收器连接器可以正确解释这些值。

    没有创建外键

    这是意料之中的,请参阅Confluent GitHub Issue。目前,JDBC 接收器不具备支持在 JDBC 级别实现外键关系的能力。

    不保留列的顺序

    这也是意料之中的。没有预期的保证 Debezium 应该以与数据库中完全相同的顺序存储关系列(尽管我们这样做),并且 JDBC 接收器连接器不能保证在读取字段时保留字段的顺序发出的事件。如果 sink 连接器使用 HashMap 之类的容器来存储列名,那么顺序可能与源数据库有很大不同。


    如果有必要在目标系统中保留更高级别的关系元数据(例如外键和列顺序)以反映源系统,您可能需要查看单独的工具链来复制初始架构和通过某种类型的模式转储、转换和导入到目标数据库的关系,然后依赖 CDC 管道进行数据复制方面。

    【讨论】:

    • 您能否就“复制初始架构和关系的单独工具链”提出建议?
    猜你喜欢
    • 1970-01-01
    • 2021-08-14
    • 1970-01-01
    • 2017-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-17
    • 1970-01-01
    相关资源
    最近更新 更多