【发布时间】:2021-04-06 11:26:04
【问题描述】:
我正在使用以下源连接器配置来过滤和读取来自 MongoDB 的状态为“PENDING”的特定记录。只需要轮询所有插入/更新为 PENDING 状态的记录。如果排除管道,源连接器能够轮询所有记录。有人可以帮我理解如何解决这个问题,还有没有办法知道轮询已经完成,就像批处理作业完成一样,以协调 kafka 消费者的另一个进程?
name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]
【问题讨论】:
-
pipeline=[{"$match": { "Status" : {"PENDING"} }},{"$project":{"_id":1,"fullDocument":1}} ]也不应该在 {} 之间吗? -
不需要。管道适用于指南针,但不适用于连接器。即使仅此一项也不起作用 {"$match": { "Status" :"PENDING" }}
标签: mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector