【问题标题】:Kafka Connect without schema, only JSONKafka Connect 没有模式,只有 JSON
【发布时间】:2021-09-25 00:32:18
【问题描述】:

我想使用带 JSON 且不带架构的 JDBC 接收器连接器。 他们写道(source):

如果您需要在没有 Schema Registry 的情况下使用 JSON 来连接数据,您可以 可以使用 Kafka 支持的 JsonConverter。下面的例子 显示了添加到 配置:

key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false

当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 true,键或值是 不被视为纯 JSON,而是作为复合 JSON 对象 包含内部架构和数据。当这些是 为源连接器启用,架构和数据都在 复合 JSON 对象。当这些为接收器连接器启用时, 架构和数据是从复合 JSON 对象中提取的。 请注意,此实现从不使用架构注册表。

当属性 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 false(默认),仅 数据在没有模式的情况下传递。这减少了有效载荷 不需要架构的应用程序的开销。

我配置了连接器:

{
  "name": "noschemajustjson",
  "config": {
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "schemas.enable": "false",
    "name": "noschemajustjson",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "config.action.reload": "restart",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "testconnect2",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "********",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "table.name.format": "utp",
    "auto.create": "false",
    "auto.evolve": "false"
  }
}

但我仍然收到错误:

原因:org.apache.kafka.connect.errors.ConnectException: Sink 连接器“noschemajustjson2”配置为 'delete.enabled=false' 和 'pk.mode=none' 因此需要 具有非空结构值和非空结构模式的记录,但是 发现记录在 (topic='testconnect2',partition=0,offset=0,timestamp=1626416739697) 具有 HashMap 值和空值模式。

那么我应该怎么做才能强制 Connect 在没有架构的情况下工作(只有纯 JSON)?

【问题讨论】:

    标签: apache-kafka-connect


    【解决方案1】:

    我想将 JDBC 接收器连接器与 JSON 和无架构

    一起使用

    您不能这样做 - JDBC Sink 连接器流向关系数据库,并且关系数据库具有模式:-D JDBC Sink 连接器因此需要为数据提供模式。

    根据您的数据来自哪里,您有不同的选择。

    • 如果从 Kafka Connect 提取,请使用支持架构(Avro、Protobuf、JSON 架构)的转换器
    • 如果它是由您控制的应用程序生成的,请让该应用程序使用架构(Avro、Protobuf、JSON 架构)序列化该数据
    • 如果它来自您无法控制的地方,那么您需要预处理主题以添加显式架构并将其写入新主题,然后由 JDBC Sink 连接器使用。李>

    参考资料和资源:

    【讨论】:

      猜你喜欢
      • 2021-04-13
      • 2019-11-16
      • 1970-01-01
      • 2019-02-03
      • 2021-12-03
      • 2021-12-05
      • 2019-12-09
      • 1970-01-01
      • 2019-06-16
      相关资源
      最近更新 更多