【问题标题】:Kafka connect elasticsearch sink extract and perform values from JSONKafka connect elasticsearch sink 从 JSON 中提取和执行值
【发布时间】:2022-12-19 11:25:54
【问题描述】:

我将 Elasticsearch Sink 连接器用于从 kafka 到 elasticsearch 的流数据,我有下一个问题。

我在kafka主题中有下一个结构document

Partition : 0 
Offset: 0
Key: 
Value: 
{
  "attributes": {
    "3": "Mike"
  }
}
Timestamp: 2022-11-03 19:03:34.866

对于这些数据,我的 elasticsearch 中有下一个索引模板

{
  "version": 1,
  "index_patterns": [
    "documents-*"
  ],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "cashier": {
        "type": "text"
      }
    }
  }
}

我有下一个配置 Elasticsearch Sink Connector

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "document, document-processing-error",
    "key.ignore": "true",
    "schema.ignore": "true",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "name": "elasticsearch-sink",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "flush.synchronously": "true",

    "transforms": "appendTimestampToIX",
    "transforms.appendTimestampToIX.type": "org.apache.kafka.connect.transforms.TimestampRouter",
    "transforms.appendTimestampToIX.topic.format": "${topic}-${timestamp}",
    "transforms.appendTimestampToIX.timestamp.format": "yyyy-MM-dd"
  }
}

在输出中,我的索引中有下一个数据document-2022-11-03

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "document-2022-11-03",
                "_type": "_doc",
                "_id": "document-2022-11-03+0+0",
                "_score": 1.0,
                "_source": {
                    "attributes": {
                        "3": "Mike"
                    }
                }
            }
        ]
    }
}

这工作正常,但我需要对我的数据进行额外的转换,例如,如果在属性中我有键3,我需要替换这个字段并添加键cashier并将这个结构变异为带有文档随机ID的平面JSON,所以,最后输出我需要下一个结构(例如)

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "document-2022-11-03",
                "_type": "_doc",
                "_id": "134DaBfWAE6AZUyKUAbjRksjXHTmP6hDxedGm4YhBnZW",
                "_score": 1.0,
                "_source": {
                      "cashier": "Mike"
                }
            }
        ]
    }
}

我厌倦了使用下一个配置替换字段,但这对我不起作用

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "arrtubites.3:cashier"

我怎样才能做到这一点?

【问题讨论】:

    标签: elasticsearch apache-kafka apache-kafka-connect


    【解决方案1】:

    ReplaceField transform 不适用于映射或对象等嵌套属性,仅适用于两者的顶级字段。

    如果你想转换

    {
      "attributes": {
        "3": "Mike"
      }
    }
    

    进入

    {
      "cashier": "Mike"
    }
    

    然后,Kafka Streams 或 ksqlDB 是常见的建议(也就是在别处消费,并根据您要执行的逻辑生成新主题)。

    Logstash 也可能是一个选项,而不是 + Kafka Connect。

    【讨论】:

      猜你喜欢
      • 2021-09-08
      • 2021-07-15
      • 2020-01-07
      • 2019-06-21
      • 2017-03-26
      • 2017-04-04
      • 2022-01-26
      • 1970-01-01
      • 2020-05-28
      相关资源
      最近更新 更多