【问题标题】:Kafka/Confluent CSV/SFTP connector and nested jsonKafka/Confluent CSV/SFTP 连接器和嵌套 json
【发布时间】:2021-12-11 08:36:56
【问题描述】:

我正在尝试读取 csv 文件并将其作为 json 发送到 kafka 主题。我从站点CSV Source Connector for Confluent Platform 的连接器示例开始,我正在使用融合本地安装。使用一个基本示例就可以了,我的 csv 示例数据是:

name,street,city
Homer Simpson,742 Evergreen Terrace,Springfield

我能够从这样的主题中读取 json:

{
  "name": "Homer Simpson",
  "street": "742 Evergreen Terrace",
  "number": "Springfield"
}

现在,我需要做的就是将这个 csv 行翻译成这个 json:

  {
    "name": "Homer Simpson",
    "address": {
      "street": "742 Evergreen Terrace",
      "number": "Springfield"
    }
  }

这是我正在使用的数据源连接器:

{
    "name": "NestedExample",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": 1,
        "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
        "kafka.topic": "nested-topic",
        "cleanup.policy": "NONE",
        "behavior.on.error": "IGNORE",
        "input.path": "/home/project/opt/confluent-6.2.0/sftp/data",
        "error.path": "/home/project/opt/confluent-6.2.0/sftp/error",
        "finished.path": "/home/project/opt/confluent-6.2.0/sftp/finished",
        "input.file.pattern": ".*.csv",
        "sftp.username": "user",
        "sftp.password": "password",
        "sftp.host": "10.254.1.6",
        "sftp.port": "22",
        "csv.ignore.leading.whitespace": "true",
        "csv.first.row.as.header": "false",
        "csv.skip.lines": 1,
        "key.schema": "{\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : true,\"fieldSchemas\" : {\"material\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}",
        "value.schema": "{ \"name\" : \"com.example.users.User\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"name\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"street\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"number\" : { \"isOptional\" : false, \"type\" : \"STRING\" } } }"
    }
}

为了便于阅读,这是我的“value.schema”格式:

{
  "name" : "com.example.users.User",
  "type" : "STRUCT",
  "isOptional" : false,
  "fieldSchemas" : {
    "name" : {
      "isOptional" : false,
      "type" : "STRING"
    },
    "street" : {
      "isOptional" : false,
      "type" : "STRING"
    },
    "number" : {
      "isOptional" : false,
      "type" : "STRING"
    }
  }
}

【问题讨论】:

    标签: csv apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    您需要编写一个单一消息转换,以便一次修改多个字段

    或者,在使用 Connect 之后,使用 ksqlDB 或 Kafka Streams 将数据映射为您期望的输出格式数据

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-12-29
      • 2017-08-30
      • 2020-02-05
      • 1970-01-01
      • 2017-11-22
      • 1970-01-01
      • 2021-10-12
      相关资源
      最近更新 更多