【发布时间】:2021-06-09 08:36:39
【问题描述】:
我的 sink 配置文件包含以下配置 -
...
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DELETESRC",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "DELETESRC",
"pk.mode": "record_key",
"pk.fields": "ID,C_NO",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "ID,C_NO"
...
我可以通过键使用 upsert,但不能在 JDBC 接收器中使用删除模式。
我将主题DELETESRC 配置为cleanup.policy=compact,delete.retention.ms=0。
我创建了一个包含 4 列 (ID,CMP,SEG,C_NO) 的 KSQL 流,并使用插入到 KSQL 中的语句来推送数据。
INSERT INTO DELETESRC VALUES ('null','11','D','1','3')
INSERT INTO DELETESRC VALUES ('null','11','C','1','4')
INSERT INTO DELETESRC VALUES ('null','12','F','1','3')
但是当我在做INSERT INTO DELETESRC VALUES ('null','11','null','null','3') 时,接收器将表格更新为11,null,null,3。
我在堆栈溢出中查看了其他答案,但这些解决方案不起作用。
我在创建墓碑记录时是否有任何错误?
我在KSQL的insert语句中尝试了其他方法,但删除操作没有发生。
【问题讨论】:
标签: apache-kafka apache-kafka-connect