【发布时间】:2021-11-23 03:38:46
【问题描述】:
我正在使用 MongoDB 源连接器设置 Kafka 连接器。
配置如下:
{
"name": "MongoSourceConn",
"config": {
"name": "MongoSourceConn",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url":"http://schema-registry:8081",
"publish.full.document.only": true,
"topics": "test_topic",
"connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
"database": "kafka",
"collection": "test_topic",
"pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"kafka.test_topic",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement":"test_topic"
如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。
如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。 我错过了什么? 以下是我们 mongo 中的文档的样子:
{
"_id" : ObjectId("61570b1d21589e03f8011235"),
"jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
"jobStatus" : 5,
"data" : null,
"createdOn" : "2021-10-01 13:20:29.215691"
}
配置是通过 rest api 推送的,所以这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。
谢谢。
【问题讨论】:
-
有了
pipeline配置,连接器的状态是什么?是'RUNNING吗? Kafka Connect 工作器日志中是否有任何错误或警告? -
在确认创建连接器后,控制台正在向这些垃圾邮件发送这些垃圾邮件:[2021-10-01 14:22:48,064] INFO WorkerSourceTask{id=MongoSourceCn_
-0} 正在刷新 0 条未完成的消息用于偏移提交(org.apache.kafka.connect.runtime.WorkerSourceTask) -
从日志中我觉得还不错,我认为问题正在酝酿中
-
顺便说一下状态正在运行。
-
您的文档没有
operationType字段,那么您希望匹配什么?显然,管道有效,因为您排除了所有没有operationType或"update"或"insert"的事件
标签: mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector