【问题标题】:kafka connect sql server incremental changes from cdckafka 从 cdc 连接 sql server 增量更改
【发布时间】:2020-09-24 21:16:15
【问题描述】:

我对 Kafka 非常陌生(仅一周就开始在我的沙盒环境中阅读和设置)并尝试设置 SQL Server JDBC 连接器。 我已经按照 confluent 指南设置了 Confluent 社区,并使用 confluent-hub 安装了 io.debezium.connector.sqlserver.SqlServerConnector 我在 SQL Server 数据库和所需表上启用了 CDC,它工作正常。

我尝试过以下连接器(一次一个):

  1. io.debezium.connector.sqlserver.SqlServerConnector
  2. io.confluent.connect.jdbc.JdbcSourceConnector

两者都加载正常,连接器状态和任务运行正常,没有错误,如下所示:

这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 配置

{
"name": "mssql-connector",
"config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "timestamp",
        "timestamp.column.name": "CreatedDateTime",
        "query": "select * from dbo.sampletable",
        "tasks.max": "1",
        "table.types": "TABLE",
        "key.converter.schemas.enable": "false",
        "topic.prefix": "data_",
        "value.converter.schemas.enable": "false",
        "connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=sampledb",
        "connection.user": "kafka",
        "connection.password": "kafkaPassword@789",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "poll.interval.ms": "5000",
        "table.poll.interval.ms": "120000"
    }
}

这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 连接器

{
 "name": "mssql-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "SQL2016",
     "database.hostname" : "SQL2016",
     "database.port" : "1433",
     "database.user" : "kafka",
     "database.password" : "kafkaPassword@789",
     "database.dbname" : "sampleDb",
     "database.history.kafka.bootstrap.servers" : "kafkanode1:9092",
     "database.history.kafka.topic": "schema-changes.sampleDb"
     }
 }

两个连接器都在创建主题中表的快照(意味着它最初会提取所有行) 但是当我对表“sampletable”(插入/更新/删除)进行更改时,这些更改不会被拉到 kafka。

有人可以帮我了解如何让 CDC 与 Kafka 一起工作吗?

谢谢

【问题讨论】:

  • 您运行的是什么 SQL Server 版本?另外,你能分享插入、删除等的示例有效负载吗?
  • 它是 SQL Server 2016 企业版。我将创建示例有效负载并将其发布在这里
  • 你检查过kafka连接日志吗?
  • 提供您为了启用 mssql cdc 功能所遵循的步骤也会很有用。

标签: apache-kafka apache-kafka-connect


【解决方案1】:

这似乎 100% 有效。我发布答案以防万一像我这样的人卡在 jdbc 源连接器上。

{
"name": "piilog-connector",
"config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "incrementing",       
        "value.converter.schemas.enable": "false",
        "connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=SIAudit",
        "connection.user": "kafka",
        "connection.password": "kafkaPassword@789",  
        "query": "select * from dbo.sampletable",
        "incrementing.column.name": "Id",
        "validate.non.null": false,
        "topic.prefix": "data_",
        "tasks.max": "1",
        "poll.interval.ms": "5000",
        "table.poll.interval.ms": "5000"
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-10
    • 2012-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-21
    • 1970-01-01
    相关资源
    最近更新 更多