【发布时间】:2021-01-30 02:44:34
【问题描述】:
我正在尝试创建一个使用 protobuf 值转换器的 kafka sink 连接器。我有一个使用 JSON 的配置版本,但是我现在需要更改它以使用 protobuf 消息。
我正在尝试使用以下请求创建连接器:
curl -X POST localhost:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "TEST_PROTO",
"connection.url": "${DB_URL}",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"auto.create": true,
"auto.evolve": true,
"type": "sink",
"connection.user": "{DB_USER}",
"connection.password": "${DB_PASS}"
}
}
这会给出以下 400 错误消息:
Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found
我不太明白为什么我不能在此处包含此内容。从我所看到的文档来看,这是一个合适的值:https://docs.confluent.io/current/connect/userguide.html
有人可以帮忙吗?
【问题讨论】:
-
您使用的是什么版本的 Confluent Platform?
-
@OneCricketeer 我使用的是 5.5.0 版,对于 kafka connect 我有:kafka-connect-jdbc-5.5.0.jar
-
你也有用于 Protobuf 转换器的 JAR 吗?
-
@OneCricketeer 我不确定。我拥有的 JAR 如下:common-utils-5.5.0.jar、kafka-connect-jdbc-5.5.0.jar、postgresql-42.2.10.jar、slf4j-api-1.7.26.jar、jtds- 1.3.1.jar,sqlite-jdbc-3.25.2.jar。这是不是少了什么?
-
这些是仅在 kafka-connect-jdbc 文件夹中的罐子。转换器存在于 schema-registry 文件夹周围(使用 protobuf 转换器时,您还需要模式注册表 url)
标签: apache-kafka protocol-buffers grpc apache-kafka-connect