【问题标题】:Kafka Connect Protobuf ConfigurationKafka Connect Protobuf 配置
【发布时间】:2021-01-30 02:44:34
【问题描述】:

我正在尝试创建一个使用 protobuf 值转换器的 kafka sink 连接器。我有一个使用 JSON 的配置版本,但是我现在需要更改它以使用 protobuf 消息。

我正在尝试使用以下请求创建连接器:

curl -X POST localhost:8083/connectors -H "Content-Type: application/json" -d '
{
    "name": "jdbc-sink-connector",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "topics": "TEST_PROTO",
      "connection.url": "${DB_URL}",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "auto.create": true,
      "auto.evolve": true,
      "type": "sink",
      "connection.user": "{DB_USER}",
      "connection.password": "${DB_PASS}"
    }
}

这会给出以下 400 错误消息:

Invalid value io.confluent.connect.protobuf.ProtobufConverter for configuration value.converter: Class io.confluent.connect.protobuf.ProtobufConverter could not be found

我不太明白为什么我不能在此处包含此内容。从我所看到的文档来看,这是一个合适的值:https://docs.confluent.io/current/connect/userguide.html

有人可以帮忙吗?

【问题讨论】:

  • 您使用的是什么版本的 Confluent Platform?
  • @OneCricketeer 我使用的是 5.5.0 版,对于 kafka connect 我有:kafka-connect-jdbc-5.5.0.jar
  • 你也有用于 Protobuf 转换器的 JAR 吗?
  • @OneCricketeer 我不确定。我拥有的 JAR 如下:common-utils-5.5.0.jar、kafka-connect-jdbc-5.5.0.jar、postgresql-42.2.10.jar、slf4j-api-1.7.26.jar、jtds- 1.3.1.jar,sqlite-jdbc-3.25.2.jar。这是不是少了什么?
  • 这些是仅在 kafka-connect-jdbc 文件夹中的罐子。转换器存在于 schema-registry 文件夹周围(使用 protobuf 转换器时,您还需要模式注册表 url)

标签: apache-kafka protocol-buffers grpc apache-kafka-connect


【解决方案1】:

我猜在这种情况下,您缺少这些配置:

  • value.converter.schema.registry.url
  • key.converter.schema.registry.url
  • key.converter.schemas.enable
  • value.converter.schemas.enable

除了这些,我还尝试使用最新的 jdbc jar 和最新版本的融合平台。如果这不起作用,请告诉我。

【讨论】:

    【解决方案2】:

    为了清楚起见,在上面的答案中添加更多内容,您需要在配置 kafka-connect 时提及以下键的值。

    value.converter = "io.confluent.connect.protobuf.ProtobufConverter"
    key.converter = "io.confluent.connect.protobuf.ProtobufConverter"
    value.converter.schema.registry.url = URL (You should have schema registry service installed and all the producers registering the schema to the service registry before writing to the broker)
    key.converter.schema.registry.url = Can be the same URL as above
    key.converter.schemas.enable = true (if using Protobuf)
    value.converter.schemas.enable = true (if using Protobuf)
    

    要验证转换器是否加载成功,您可以查看 INFO 日志。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-10-16
      • 2020-10-23
      • 2022-11-11
      • 2021-06-03
      • 2018-06-16
      • 2017-05-27
      • 2021-07-01
      • 2020-01-06
      相关资源
      最近更新 更多