【问题标题】:ExtractField and Parse JSON in kafka-connect sink在 kafka-connect 接收器中提取字段和解析 JSON
【发布时间】:2019-04-10 04:45:09
【问题描述】:

我有一个 mongodb->kafka connect->elasticsearch 的 kafka-connect 流程,可以端到端发送数据,但是有效负载文档是 JSON 编码的。这是我的源 mongodb 文档。

{
  "_id": "1541527535911",
  "enabled": true,
  "price": 15.99,
  "style": {
    "color": "blue"
  },
  "tags": [
    "shirt",
    "summer"
  ]
}

这是我的 mongodb 源连接器配置:

{
  "name": "redacted",
  "config": {
    "connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
    "databases": "redacted.redacted",
    "initial.import": "true",
    "topic.prefix": "redacted",
    "tasks.max": "8",
    "batch.size": "1",
    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
    "key.serializer.schemas.enable": false,
    "value.serializer.schemas.enable": false,
    "compression.type": "none",
    "mongo.uri": "mongodb://redacted:27017/redacted",
    "analyze.schema": false,
    "schema.name": "__unused__",
    "transforms": "RenameTopic",
    "transforms.RenameTopic.type":
      "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.RenameTopic.regex": "redacted.redacted_Redacted",
    "transforms.RenameTopic.replacement": "redacted"
  }
}

在 elasticsearch 中,它最终看起来像这样:

{
  "_index" : "redacted",
  "_type" : "kafka-connect",
  "_id" : "{\"schema\":{\"type\":\"string\",\"optional\":true},\"payload\":\"1541527535911\"}",
  "_score" : 1.0,
  "_source" : {
    "ts" : 1541527536,
    "inc" : 2,
    "id" : "1541527535911",
    "database" : "redacted",
    "op" : "i",
    "object" : "{ \"_id\" : \"1541527535911\", \"price\" : 15.99,
      \"enabled\" : true, \"tags\" : [\"shirt\", \"summer\"],
      \"style\" : { \"color\" : \"blue\" } }"
  }
}

我想使用 2 个单消息转换:

  1. ExtractField 抓取object,这是一串JSON
  2. 可以将该 JSON 解析为一个对象,或者只让普通的 JSONConverter 处理它,只要它在 elasticsearch 中的结构正确即可。

我尝试在我的接收器配置中仅使用 ExtractField 来执行此操作,但我看到 kafka 记录了此错误

kafka-connect_1       | org.apache.kafka.connect.errors.ConnectException:
Bulk request failed: [{"type":"mapper_parsing_exception",
"reason":"failed to parse", 
"caused_by":{"type":"not_x_content_exception",
"reason":"Compressor detection can only be called on some xcontent bytes or
compressed xcontent bytes"}}]

这是我的 elasticsearch sink 连接器配置。在这个版本中,我的工作正常,但我必须编写自定义 ParseJson SMT。它运行良好,但如果有更好的方法或通过某种内置东西(转换器、SMT、任何可行的方法)的组合来做到这一点,我很乐意看到。

{
  "name": "redacted",
  "config": {
    "connector.class":
      "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "batch.size": 1,
    "connection.url": "http://redacted:9200",
    "key.converter.schemas.enable": true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schema.ignore": true,
    "tasks.max": "1",
    "topics": "redacted",
    "transforms": "ExtractFieldPayload,ExtractFieldObject,ParseJson,ReplaceId",
    "transforms.ExtractFieldPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldPayload.field": "payload",
    "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldObject.field": "object",
    "transforms.ParseJson.type": "reaction.kafka.connect.transforms.ParseJson",
    "transforms.ReplaceId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceId.renames": "_id:id",
    "type.name": "kafka-connect",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

【问题讨论】:

  • 您可以添加您的 ElasticSearch 配置吗?看起来你正在使用 StringConverter
  • @cricket_007 我添加了我的 ElasticSearch 接收器配置。让我知道是否有更好的方法使用不同的value.converter 或其他东西。
  • 您不使用 JSONConverter 是否有原因?因为负载实际上是 JSON ......你也可以添加你的 Mongo 源连接器吗?
  • 我想我将它用于 value.converter,对吧?你在谈论key.converter吗? (我对 kafka-connect 非常陌生,目前完全被所有这些配置所淹没,所以如果我的问题在假设清楚/完全理解的情况下甚至没有意义,我深表歉意)
  • 我有同样的问题,我猜这是由 mongo 源连接器确定的,它移交给它的转换器(无论是什么)String(恰好包含 mongo JSON 记录)而不是Map 或 Kafka struct。到目前为止,我得出的结论是,使用自定义 JSON 解析器 SMT 的解决方案似乎是最好的。如果您同意分享,请提供 ParseJson 来源的链接。

标签: elasticsearch apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

我不确定您的 Mongo 连接器。我不认识类或配置...大多数人可能使用Debezium Mongo connector

不过我会这样设置

"connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",

"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
"key.serializer.schemas.enable": false,
"value.serializer.schemas.enable": true,

schemas.enable 很重要,这样内部 Connect 数据类就可以知道如何与其他格式进行转换。

然后,在 Sink 中,您再次需要使用 JSON DeSerializer(通过转换器),以便它创建完整的对象而不是纯文本字符串,正如您在 Elasticsearch 中看到的 (@987654324 @)。

"connector.class":
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true

如果这不起作用,那么您可能必须提前在 Elasticsearch 中手动创建索引映射,以便它知道如何实际解析您发送给它的字符串

【讨论】:

  • 感谢您的回答!我已经使用 2 种 ExtractField 组合尝试了您的设置。我最终还是得到了这个错误或 NullPointerException,我认为这是因为虽然现在有一个包装 JSON 对象的模式,但没有payload.object JSON 编码字符串的模式。基本上我认为我需要 2 个模式。
  • 您是否偶然使用了 Confluent 模式注册表?我实际上对 JSONConverter 没有太多经验,不知道它对模式信息的持久性有多好,但是 Avro 的定义更加严格,而 Elastic 连接器更喜欢以这种方式定义模式。或者,至少在 Mongo 连接器中,也许可以尝试 analyze.schema=true 以便您可以以某种方式将模式作为记录的一部分持久化
猜你喜欢
  • 2020-05-13
  • 2020-11-01
  • 2020-05-29
  • 2019-07-03
  • 2019-06-17
  • 2019-02-23
  • 2016-12-26
  • 1970-01-01
  • 2022-12-19
相关资源
最近更新 更多