【问题标题】:How to get kafka message's headers in Kafka Connect Sink connector with MongoDB如何在带有 MongoDB 的 Kafka Connect Sink 连接器中获取 kafka 消息的标头
【发布时间】:2021-01-26 07:02:06
【问题描述】:

如何使用 Kafka Connect 从 kafka 消息中检索传入的标头,以使用 MongoDB Sink Connector 将它们作为附加数据字段存储到 mongodb。

我有一个 kafka 主题“PROJECT_EXAMPLE_TOPIC”。 如您所见,我已经能够保存 msg 时间戳、传入消息数据和 mongo 文档创建/更新日期。

我猜有一个函数可以在某处提取标题。

kafka 值示例

  // incoming kafka value
  {
    "msgId" : "exampleId"
  }
  1. 如何获取原始标题header_foo

  //expected example
  {
  
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_header_foo" : "header_foo_value"
   }


  1. 如何获取所有 kafka 标头?
  //expected example
  {
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_headers" : {
        "header_001" : "header_001_value",
        "header_002" : "header_002_value",
        ...
        "header_x" : "header_x_value"
    }
  }


有我的配置

{
    "name": "sink-mongo-PROJECT-EXAMPLE",
    "config": {
      "topics": "PROJECT_EXAMPLE_TOPIC",
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max": "1",
  
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
      "key.converter.schemas.enable": "false",
      "key.converter.basic.auth.credentials.source": "USER_INFO",
      "key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
  
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
      "value.converter.schemas.enable": "false",
      "value.converter.basic.auth.credentials.source": "USER_INFO",
      "value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
      "connection.uri": "PROJECT_REFERENTIAL_MONGO_URL",
      "database": "PROJECT_DB_NAME",
      "collection": "EXAMPLE",
      "max.num.retries": "3",
      "retries.defer.timeout": "5000",
  
  
      "key.projection.type": "none",
      "key.projection.list": "",
  
      "field.renamer.mapping": "[]",
      "field.renamer.regex": "[]",
  
      "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
      "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
      "value.projection.list": "msgId",
      "value.projection.type": "whitelist",
      "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
    
      "delete.on.null.values": "false",
    
      "max.batch.size": "0",
      "rate.limiting.timeout": "0",
      "rate.limiting.every.n": "0",
    
    
      "change.data.capture.handler": "",
  
      "errors.tolerance": "all",
      "errors.log.enable":true,
      "errors.log.include.messages":true,

      "transforms": "InsertSource,InsertTopic,InsertTimestamp",
      "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertSource.static.field": "message_source",
      "transforms.InsertSource.static.value": "mongo_connector",
      "transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertTopic.topic.field": "message_topic",
      "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertTimestamp.timestamp.field": "message_timestamp"

    }
  }

【问题讨论】:

  • 寻找相同的!

标签: apache-kafka apache-kafka-connect mongodb-kafka-connector


【解决方案1】:

这是一个老问题,但是有一个 3rd 方消息转换可以将标头转换为键或值上的字段

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/HeaderToField.html

这将不允许您抓取所有标题,您需要按名称指定要提取的标题及其类型。

【讨论】:

    猜你喜欢
    • 2019-04-18
    • 2021-12-22
    • 2019-07-15
    • 2019-01-21
    • 1970-01-01
    • 2022-01-10
    • 2022-07-29
    • 2018-12-29
    • 2020-07-31
    相关资源
    最近更新 更多