【问题标题】:Kafka Connect - MongoDB Source Connector - Pipeline Not WorkingKafka Connect - MongoDB 源连接器 - 管道不工作
【发布时间】:2021-11-23 03:38:46
【问题描述】:

我正在使用 MongoDB 源连接器设置 Kafka 连接器。

配置如下:

{
  "name": "MongoSourceConn",
  "config": {
    "name": "MongoSourceConn",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "publish.full.document.only": true,
    "topics": "test_topic",
    "connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
    "database": "kafka",
    "collection": "test_topic",
    "pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
    "transforms":"dropPrefix",
    "transforms.dropPrefix.regex":"kafka.test_topic",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.replacement":"test_topic"

如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。

如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。 我错过了什么? 以下是我们 mongo 中的文档的样子:

{
    "_id" : ObjectId("61570b1d21589e03f8011235"),
    "jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
    "jobStatus" : 5,
    "data" : null,
    "createdOn" : "2021-10-01 13:20:29.215691"
}

配置是通过 rest api 推送的,所以这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。

谢谢。

【问题讨论】:

  • 有了pipeline 配置,连接器的状态是什么?是'RUNNING吗? Kafka Connect 工作器日志中是否有任何错误或警告?
  • 在确认创建连接器后,控制台正在向这些垃圾邮件发送这些垃圾邮件:[2021-10-01 14:22:48,064] INFO WorkerSourceTask{id=MongoSourceCn_-0} 正在刷新 0 条未完成的消息用于偏移提交(org.apache.kafka.connect.runtime.WorkerSourceTask)
  • 从日志中我觉得还不错,我认为问题正在酝酿中
  • 顺便说一下状态正在运行。
  • 您的文档没有operationType 字段,那么您希望匹配什么?显然,管道有效,因为您排除了所有没有 operationType"update""insert" 的事件

标签: mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector


【解决方案1】:

很明显,这条管道永远不会匹配,因为它当前包含{\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}

你提到你删除了它,但没有看到更多内容,不可能确切知道你是如何删除它的,所以那里可能出了点问题。

此外,您还不清楚获得数据后的确切外观。您在 Mongo 中显示一条消息,但可能会被包装到其他内容中(例如,由于变更流),因此字段 jobStatus 可能在顶层不可用,但最终嵌套。

我会推荐以下步骤:

  1. 在没有管道的情况下检查数据在 kafka 中的外观
  2. 从只做一件事的最简单的管道开始
  3. 在您能够以某种方式使用管道之前一直使用它
  4. 然后继续扩展逻辑直到你回到你想要的状态

我知道这些步骤有点笼统,但加上上面指出的内容,希望就足够了。

【讨论】:

    猜你喜欢
    • 2021-09-17
    • 2021-04-06
    • 2020-08-05
    • 2020-01-15
    • 2021-04-07
    • 2022-10-20
    • 2017-10-02
    • 2020-05-07
    • 2021-03-28
    相关资源
    最近更新 更多