【问题标题】:MongoDB Kafka Connector not generating the message key with the Mongo document idMongoDB Kafka 连接器未生成带有 Mongo 文档 ID 的消息密钥
【发布时间】:2019-12-04 17:57:36
【问题描述】:

我正在使用MongoDB Kafka Connector to publish from MongoDB to a Kafka topic. 的测试版

消息在 Kafka 中生成,但是当它应该是文档 ID 时,它们的键为空:

这是我的连接独立配置

bootstrap.servers=xxx:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

以及mongodb源属性

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb+srv://xxx
database=mydb
collection=mycollection

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=

下面是一个消息字符串值的例子:

"{\"_id\": {\"_data\": \"xxx\"}, \"operationType\": \"replace\", \"clusterTime\": {\"$timestamp\": {\"t\": 1564140389, \"i\": 1}}, \"fullDocument\": {\"_id\": \"5\", \"name\": \"Some Client\", \"clientId\": \"someclient\", \"clientSecret\": \"1234\", \"whiteListedIps\": [], \"enabled\": true, \"_class\": \"myproject.Client\"}, \"ns\": {\"db\": \"mydb\", \"coll\": \"mycollection\"}, \"documentKey\": {\"_id\": \"5\"}}"

我尝试使用转换从值中提取 if,特别是从 documentKey 字段中提取:

transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=documentKey

但是遇到了异常:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:79)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)

有什么想法可以生成带有文档 ID 的密钥吗?

【问题讨论】:

  • key.converter=org.apache.kafka.connect.storage.StringConverter 怎么样?
  • 我刚刚尝试了相同的结果键-> null
  • 我明白了。本质上,您必须提取documentKey,然后您还需要提取内部_id 字段。
  • 就是这样...我一直在尝试使用 ValueToKey 转换,但还没有成功

标签: mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector


【解决方案1】:

根据异常,即抛出:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:79)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)

不幸的是,您使用的 Mongo DB connector,它没有创建正确的架构

以上连接器使用键和值架构创建记录为String。 检查这一行::How record is created by connector。这就是你不能对其应用转换的原因

【解决方案2】:

这应该在 1.3.0 版中得到支持: https://jira.mongodb.org/browse/KAFKA-40

【讨论】:

  • 感谢参考,我会留意的。
猜你喜欢
  • 2022-07-29
  • 2018-12-01
  • 2019-10-15
  • 1970-01-01
  • 2020-03-23
  • 2021-01-26
  • 2021-09-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多