【发布时间】:2021-12-11 08:36:56
【问题描述】:
我正在尝试读取 csv 文件并将其作为 json 发送到 kafka 主题。我从站点CSV Source Connector for Confluent Platform 的连接器示例开始,我正在使用融合本地安装。使用一个基本示例就可以了,我的 csv 示例数据是:
name,street,city
Homer Simpson,742 Evergreen Terrace,Springfield
我能够从这样的主题中读取 json:
{
"name": "Homer Simpson",
"street": "742 Evergreen Terrace",
"number": "Springfield"
}
现在,我需要做的就是将这个 csv 行翻译成这个 json:
{
"name": "Homer Simpson",
"address": {
"street": "742 Evergreen Terrace",
"number": "Springfield"
}
}
这是我正在使用的数据源连接器:
{
"name": "NestedExample",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"tasks.max": 1,
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "nested-topic",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": "/home/project/opt/confluent-6.2.0/sftp/data",
"error.path": "/home/project/opt/confluent-6.2.0/sftp/error",
"finished.path": "/home/project/opt/confluent-6.2.0/sftp/finished",
"input.file.pattern": ".*.csv",
"sftp.username": "user",
"sftp.password": "password",
"sftp.host": "10.254.1.6",
"sftp.port": "22",
"csv.ignore.leading.whitespace": "true",
"csv.first.row.as.header": "false",
"csv.skip.lines": 1,
"key.schema": "{\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : true,\"fieldSchemas\" : {\"material\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}",
"value.schema": "{ \"name\" : \"com.example.users.User\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"name\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"street\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"number\" : { \"isOptional\" : false, \"type\" : \"STRING\" } } }"
}
}
为了便于阅读,这是我的“value.schema”格式:
{
"name" : "com.example.users.User",
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"name" : {
"isOptional" : false,
"type" : "STRING"
},
"street" : {
"isOptional" : false,
"type" : "STRING"
},
"number" : {
"isOptional" : false,
"type" : "STRING"
}
}
}
【问题讨论】:
标签: csv apache-kafka apache-kafka-connect confluent-platform