【发布时间】:2021-08-21 16:45:34
【问题描述】:
我正在使用 SQL 连接器捕获表上的 CDC,我们只公开表上所有列的子集。该表上有两个唯一索引 A 和 B。两个索引都没有标记为 PRIMARY INDEX,但索引 A 在逻辑上是我们产品中的主键,也是我想与连接器一起使用的主键。索引 B 引用了我们不向 CDC 公开的列。索引 B 并没有真正在我们的产品中用作表的唯一键,它仅被标记为 UNIQUE,因为它被认为是唯一的,并且标记它给我们带来了性能优势。
这似乎导致了以下错误。我已经尝试使用连接器上的message.key.columns 选项来指定索引 A 作为该表的键,并希望忽略索引 B。但是,连接器似乎仍然想对索引 B 做一些事情
- 我该如何解决这种情况?
- 就我自己的理解而言,为什么连接器会关心引用 CDC 未公开的列的索引?
- 就我自己的理解而言,除了 CDC 表上配置的索引之外,连接器为什么还要关心任何索引,即参见 CDC.change_tables.index_name 文档
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:290)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: The column "mynoncdccolumn" is referenced as PRIMARY KEY, but a matching column is not defined in table "mydatabase.myschema.mytable"!
at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:105)
at java.base/java.util.ArrayList.removeIf(ArrayList.java:1702)
at java.base/java.util.ArrayList.removeIf(ArrayList.java:1690)
at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:101)
at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:254)
at io.debezium.connector.sqlserver.SqlServerConnection.getTableSchemaFromTable(SqlServerConnection.java:428)
at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getCdcTablesToQuery(SqlServerStreamingChangeEventSource.java:378)
at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:121)
我的连接器配置
{
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.user": "myuser",
"database.password": "mspassword",
"database.dbname": "mydb",
"database.hostname": "mysqlserverinstance",
"database.history.kafka.bootstrap.servers": "b-1.mycluster:9092,b-2.mycluster:9092,b-3.mycluster:9092",
"database.history.kafka.topic": "myhistorytopic",
"database.server.name": "myserver",
"message.key.columns": "myschema.mytable:KeyColumn1,KeyColumn2;"
}
表定义
CREATE TABLE [myschema].[mytable]
(
[MyKeyColumn1] [int] NOT NULL,
[MyKeyColumn2] [int] NOT NULL,
[Data] [varchar] (255) NOT NULL,
[UniqueColumn1] [timestamp] NOT NULL
)
GO
CREATE UNIQUE NONCLUSTERED INDEX [IndexB] ON [myschema].[mytable] ([UniqueColumn1])
GO
CREATE UNIQUE NONCLUSTERED INDEX [IndexA] ON [myschema].[mytable] ([MyKeyColumn1], [MyKeyColumn2])
GO
【问题讨论】:
标签: sql-server debezium