【发布时间】:2020-12-30 15:35:54
【问题描述】:
我将 Debezium 连接器用于带有 Kafka 连接的 postgres。
对于连接器写入 Kafka 的插入行事件,我需要有关哪些列是主键,哪些不是主键的信息。有没有办法做到这一点?
粘贴在 Kafka 中生成的示例插入事件:
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int32",
"optional": false,
"field": "bucket_type"
}
],
"optional": true,
"name": "postgresconfigdb.config.alert_configs.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int32",
"optional": false,
"field": "bucket_type"
}
],
"optional": true,
"name": "postgresconfigdb.config.alert_configs.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "postgresconfigdb.config.alert_configs.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1100,
"bucket_type": 10
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "postgresconfigdb",
"ts_ms": 1599830887858,
"snapshot": "true",
"db": "configdb",
"schema": "config",
"table": "alert_configs",
"txId": 2139888,
"lsn": 379356048,
"xmin": null
},
"op": "r",
"ts_ms": 1599830887859,
"transaction": null
}
}
这里表格中的列是'id'和'bucket_type',它们的值在json-path payload->after中报告。
在特定于列的“可选”布尔字段中有关于不为空的列的信息,但是没有关于哪些列是主键的信息。 (本例中为id)
【问题讨论】:
-
手动可以吗?您可以使用 Kafka Connect SMT 将 PK 字段提取到消息密钥中,然后在消费者端使用它
标签: postgresql apache-kafka-connect confluent-platform amazon-aurora debezium