【问题标题】:Can we update/Upsert a record in mongodb? data source is kafka我们可以在 mongodb 中更新/更新记录吗?数据源是kafka
【发布时间】:2020-04-02 08:09:29
【问题描述】:

我们可以在 mongodb 中更新/更新记录,但是是否有任何方法或函数可以直接在 mongodb 中更新或更新文档,并且源系统是 kafka,目标是 mongodb。

【问题讨论】:

  • 抱歉没有关注。在从 kafka 进程读取为 update/upsert 之后直接更新文档?你能扩展一下吗?可能会添加一些代码?

标签: apache-kafka apache-kafka-connect mongodb-kafka-connector


【解决方案1】:

是的,我们可以更新/更新数据。 对于更新,您必须在 Kafka 连接器中定义一个参数。 并将要更新记录的列列入白名单。属性如下:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=tokenNumber
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

【讨论】:

  • 有效!太糟糕了,文档太难理解了(没有足够的例子,或者只是花了太多时间才能正确理解)......属性的名称在我看来只是令人困惑并且违反直觉我看到很多开发人员很难设置这个了。
  • 是的兄弟,你说得对,文档太难一口气理解了。
  • @nixxo_raa 您显示的配置中的 parametercolumn 是什么?
【解决方案2】:

我一直在苦苦挣扎,最后我得到了答案。我使用了以下 Mongodb sink connector

我在他们的文档上苦恼了一段时间后,终于找到了解决方案。

这是我正在使用的确切 mongodb sink 连接器配置

{
  "name": "mongodbsync",
  "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
  "topics": "alpha-foobar",
  "mongodb.connection.uri": "mongodb://localhost:27017/kafkaconnect?w=1&journal=true",
  "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy"

}

我在我的配置中将mongodb.writemodel.strategy 留空,所以它采用默认配置

我使用了来自同一连接器的 github 的以下文档的用例 2

我正在处理这种情况,将带有kafka-jdbc-source connect的mysql表数据传输到mongodb sink

上述策略也可以在official docs 中找到 如果您有任何疑问,请随时提出。谢谢

【讨论】:

  • 您建议的这种方法不能完全工作我收到一个名为 org.apache.kafka.connect.errors.DataException: 无法将密钥 456 转换为 BsonDocument 的错误。\n \tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146) \n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)\n\tat
  • 你必须确保你有在 kafka 中激活的序列化程序
  • org.apache.kafka.connect.errors.DataException: 无法将密钥 456 转换为 BsonDocument。\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped (LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146)\n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument .clone(SinkDocument.java:45)\n\tat 这是我得到@Shubh的错误
  • 让我分享我的连接器,以便您更好地理解
  • "config":{ "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "connection.uri":"mongodb: //xx.xx.xx:27017", "database":"topic1", "collection":"tag", "topics":"tag_update_new1", "key.converter": "org.apache.kafka.connect. storage.StringConverter", "value.converter":"io.confluent.connect.json.JsonSchemaConverter", "value.converter.schema.registry.url": "xx.xxx.xx.xx:8081",
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-12
  • 1970-01-01
  • 1970-01-01
  • 2014-11-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多