【发布时间】:2021-09-07 16:59:36
【问题描述】:
我使用debezium-ui repo来测试debezium mysql cdc特性,消息可以正常流式传输
进入kafka,创建mysql connect的请求体如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "dbzui-db-mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysql",
"database.server.id": "184054",
"database.server.name": "inventory-connector-mysql",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}
然后我需要将 kafka 消息放入我的团队使用的数据仓库雪花中。我创建了一个雪花接收器连接器来接收它,请求体如下:
{
"name": "kafka2-04",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": 1,
"topics": "inventory-connector-mysql.inventory.orders",
"snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
"snowflake.url.name": "**.snowflakecomputing.com",
"snowflake.user.name": "kafka_connector_user_1",
"snowflake.private.key": "*******",
"snowflake.private.key.passphrase": "",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
"value.converter.schemas.enable":"true"
}
}
但是运行后,我的雪花中的数据下沉是这样的:data in snowflake,雪花表中的模式与mysql表不同。是我的接收器连接器配置不正确还是无法使用 SnowflakeSinkConnector 接收 debezium 生成的 kafka 数据。
【问题讨论】:
标签: apache-kafka snowflake-cloud-data-platform debezium