【发布时间】:2022-01-15 08:03:33
【问题描述】:
我需要将 MySQL 数据库复制到 PostgreSQL 数据库。我选择了:
- Debezium 连接
- Avro 格式
- 融合模式注册表
- 卡夫卡
正在复制数据,但是,我丢失了一些架构信息。例如,mysql中datetime格式的列在Postgres中被复制为bigint,没有创建外键,也没有保留列的顺序(这很好)等等。
PostgreSQL 接收器连接器:
{
"name": "jdbc-sink-dbt",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics.regex": "test_(.*)",
"connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
"transforms": "unwrap,removePrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.removePrefix.regex": "test_([^.]+)",
"transforms.removePrefix.replacement": "$1",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
MySQL 连接器:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2_$3",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium 连接配置:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
【问题讨论】:
标签: mysql postgresql confluent-platform confluent-schema-registry debezium