【发布时间】:2021-04-10 23:03:06
【问题描述】:
我正在使用 MongoDB Kafka 连接器,我能够成功创建和更新记录,但无法删除连接器配置中的记录我正在使用此配置
{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "test",
"connection.uri": "mongodb://mongo1:27018,mongo2:27019,mongo3:27020",
"database": "accounting",
"collection": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "false",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy",
"document.id.strategy.partial.key.projection.list":"productId",
"document.id.strategy.partial.key.projection.type":"ALLOWLIST",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"delete.on.null.values": "true",
"transforms":"WrapKey",
"transforms.WrapKey.type":"org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.WrapKey.field":"_id"
}
}
如果我发送空记录,文档更新为空而不删除任何想法,则无法删除记录?
【问题讨论】:
-
你如何发送你的空记录?您可以在问题中添加您的消息示例吗?
-
@AmitSingh 这是我的消息 { key: "898900", value: { productId: 1, productName: null }, headers: { type: "test-value", subject: "test-value ",correlationId: '测试', } }
-
这是您要删除的邮件?
-
是的,我也将 productid 和 Name 都设为 null,但运气不好,它会用 null 更新两条记录
-
对于删除,您的值应设置为 null,因此您的键:“898900”和值:null
标签: apache-kafka apache-kafka-connect mongodb-kafka-connector