【问题标题】:How to import MS Sql Server tables to KSQL with Kafka connect如何使用 Kafka 连接将 MS Sql Server 表导入 KSQL
【发布时间】:2019-03-15 10:30:18
【问题描述】:

您好,我正在尝试将远程 SQL Server 上存在的所有表导入 KSQL 主题 这是我的文件属性

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true

比我做的

confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties

在日志中

confluent log connect -f

它显示

[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)

它运行正常但没有导入任何东西,主题保持为空

confluent status sqlservertest 
{
  "name": "sqlservertest",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.132.0.2:8083"
  },
  "tasks": [],
  "type": "source"
}

我也改变了属性

name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

但我收到此错误

[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
        at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
        at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
        at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
        at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
        at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
        at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
        ... 11 more

【问题讨论】:

  • 我正在检查日志融合日志连接,它正确连接到服务器 Ms Sql 但没有其他任何事情发生
  • {{kafka-connect-url}}/connectors/{{connector-name}}/status 上的 POST 请求的结果是什么?
  • 融合状态 sqlservertest { "name": "sqlservertest", "connector": { "state": "RUNNING", "worker_id": "10.132.0.2:8083" }, "tasks": [],“类型”:“来源”}
  • 我用更多细节更新了我的问题
  • 你的表、数据库、模式等的名称是什么?

标签: sql-server apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

我找到了这个错误的真正原因,Kafka 连接器正在使用 MS Sql server 2012 中存在的函数,特别是函数中的 IFF 和布尔比较

select IFF(1>2,'OK','KO');
select (1>2) as bool;

不能在 MS Sql 2008 上运行

真正的原因是 Conflunet MSSQL 连接器仅适用于 MS SQL Server 2012 及更高版本,而我正在运行 2008 版

我反编译了库 kafka-connect-cdc-mssql 并调整了 sql 代码以兼容 sqlserver 2008,现在它可以工作了。

也许我会将它推送到 github 以供所有人使用

【讨论】:

  • 嘿,我有相同的错误和相同版本的 mssql。你能在github上分享你的代码吗?谢谢!!!
  • 您好,如果需要我们可以私聊。我被这个阻止了。
猜你喜欢
  • 2018-08-30
  • 2010-11-08
  • 1970-01-01
  • 1970-01-01
  • 2020-03-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多