【发布时间】:2019-01-29 03:13:12
【问题描述】:
使用: confluent-5.1.0
SINK 配置:
curl -X POST \
http://localhost:8083/connectors \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"name": "dbz-sink-connector-1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "dbauditt4",
"topic.index.map": "our3.platform.business:plat_index",
"topics.regex":"our3.platform.business",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"group.id":"plot",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"transforms": "timestamp_convertor",
"transforms.timestamp_convertor.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp_convertor.target.type": "string",
"transforms.timestamp_convertor.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.timestamp_convertor.field":"data.ts_ms"
}
}';
kafka 主题中的示例消息是:
{
"data": {
"before": null,
"after": {
"Id": 331458,
"business_id": 532334,
"sms_opted": 1
},
"source": {
"version": "0.7.5",
"name": "our3",
"server_id": 810143323,
"ts_sec": 1548661255,
"gtid": null,
"file": "mysql-bin-changelog.001786",
"pos": 1719980,
"row": 0,
"snapshot": false,
"thread": 11674162,
"db": "platform",
"table": "business"
},
"op": "c",
"ts_ms": 1548661255851
}
}
连接器抛出空指针异常。
java.lang.Thread.run(Thread.java:748) 引起:java.lang.NullPointerException at org.apache.kafka.connect.transforms.TimestampConverter.inferTimestampType(TimestampConverter.java:422) at
谁能帮帮我。我究竟做错了什么 ?
【问题讨论】:
-
在 Kafka Connect 中,您不能通过
.引用嵌套结构字段,您必须使用 ex 提取值。org.apache.kafka.connect.transforms.Flatten转型。我认为,如果没有架构,您可能会遇到映射值的问题。
标签: apache-kafka apache-kafka-connect