【问题标题】:How to rename/replace a field within a struct in Kafka-connect SMT?如何重命名/替换 Kafka-connect SMT 结构中的字段?
【发布时间】:2020-11-20 15:05:28
【问题描述】:

description for replaceField SMT 说它可以Filter or rename fields within a Struct or Map. 但是我找不到任何用于替换或重命名结构中的字段的工作示例。

我有一个主题中的数据正在使用Kafka Connect Elasticsearch Sink 写入 ElasticSearch。为简单起见,假设数据格式如下所示。

{
  'ID':22, 
  'ITEM': 'Shampoo'
  'USER':{
    'NAME': 'jon', 
    'AGE':25
   }
}

因此,如果我尝试重命名/替换 USER.NAMEUSER.AGE,我将如何在连接器中进行配置? (我已经在 ksqldb 中编写了所有内容)。这是我当前的配置,我将 ITEM 重命名为 productID 重命名为 id

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url' = 'http://host.docker.internal:9200',
    'type.name' = '_doc',
    'topics' = 'ELASTIC_TOPIC',
    'key.ignore' = 'false',
    'schema.ignore' = 'true',
    'transforms' = 'RenameField',
    'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);

【问题讨论】:

标签: apache-kafka apache-kafka-connect ksqldb


【解决方案1】:

看看现有的SO问答:https://stackoverflow.com/a/56601093/4778022

您可以提供要重命名的字段的路径,部分以句点分隔。

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'connection.url' = 'http://host.docker.internal:9200',
    'type.name' = '_doc',
    'topics' = 'ELASTIC_TOPIC',
    'key.ignore' = 'false',
    'schema.ignore' = 'true',
    'transforms' = 'RenameField',
    'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
);

【讨论】:

    猜你喜欢
    • 2020-08-14
    • 2020-11-27
    • 2020-11-28
    • 2020-11-26
    • 2017-02-06
    • 2021-07-17
    • 2019-05-29
    • 2020-11-01
    • 1970-01-01
    相关资源
    最近更新 更多