【发布时间】:2020-09-24 21:16:15
【问题描述】:
我对 Kafka 非常陌生(仅一周就开始在我的沙盒环境中阅读和设置)并尝试设置 SQL Server JDBC 连接器。 我已经按照 confluent 指南设置了 Confluent 社区,并使用 confluent-hub 安装了 io.debezium.connector.sqlserver.SqlServerConnector 我在 SQL Server 数据库和所需表上启用了 CDC,它工作正常。
我尝试过以下连接器(一次一个):
- io.debezium.connector.sqlserver.SqlServerConnector
- io.confluent.connect.jdbc.JdbcSourceConnector
两者都加载正常,连接器状态和任务运行正常,没有错误,如下所示:
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 配置
{
"name": "mssql-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "CreatedDateTime",
"query": "select * from dbo.sampletable",
"tasks.max": "1",
"table.types": "TABLE",
"key.converter.schemas.enable": "false",
"topic.prefix": "data_",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=sampledb",
"connection.user": "kafka",
"connection.password": "kafkaPassword@789",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "120000"
}
}
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 连接器
{
"name": "mssql-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "SQL2016",
"database.hostname" : "SQL2016",
"database.port" : "1433",
"database.user" : "kafka",
"database.password" : "kafkaPassword@789",
"database.dbname" : "sampleDb",
"database.history.kafka.bootstrap.servers" : "kafkanode1:9092",
"database.history.kafka.topic": "schema-changes.sampleDb"
}
}
两个连接器都在创建主题中表的快照(意味着它最初会提取所有行) 但是当我对表“sampletable”(插入/更新/删除)进行更改时,这些更改不会被拉到 kafka。
有人可以帮我了解如何让 CDC 与 Kafka 一起工作吗?
谢谢
【问题讨论】:
-
您运行的是什么 SQL Server 版本?另外,你能分享插入、删除等的示例有效负载吗?
-
它是 SQL Server 2016 企业版。我将创建示例有效负载并将其发布在这里
-
你检查过kafka连接日志吗?
-
提供您为了启用 mssql cdc 功能所遵循的步骤也会很有用。
标签: apache-kafka apache-kafka-connect