【问题标题】:How to solve Kafka Connect JSONConverter "Schema must contain 'type' field"如何解决 Kafka Connect JSONConverter“架构必须包含‘类型’字段”
【发布时间】:2020-04-19 10:08:59
【问题描述】:

我正在尝试向 JdbcSink 推送消息,消息如下

{
  "schema": {
    "type": "struct",
      "fields": [{
        "field": "ID",
        "type": {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      }, {
        "field": "STORE_DATE",
        "type": ["null", {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }],
        "default": null
      }, {
        "field": "DATA",
        "type": ["null", "string"],
        "default": null
      }],
        "name": "KAFKA_STREAM"
  },
    "payload": {
      "ID": 17,
        "STORE_DATE": null,
          "DATA": "THIS IS TEST DATA"
    }
}

但它一直抛出错误Caused by: org.apache.kafka.connect.errors.DataException: Schema must contain 'type' field

这是当前使用的连接器配置

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "DEV_KAFKA_STREAM",
    "connection.url": "url",
    "connection.user": "user",
    "connection.password": "password",
    "insert.mode": "insert",
    "table.name.format": "KAFKA_STREAM",
    "pk.fields": "ID",
    "auto.create": "false",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
}

我不确定如何调试或如何找到根本原因,因为 json 确实有 type 字段

【问题讨论】:

标签: jdbc apache-kafka apache-kafka-connect


【解决方案1】:

据我所知,"long" 不是有效的架构类型。

你想要"int64"

JSON Schema source code

您可能还想删除工会。有一个 optional 键来指定可空字段

Kafka Connect JDBC sink connector not working

如果您在 java 中创建该 JSON,您应该使用 SchemaBuilder 和围绕两个 JSONNode 对象的 Envelope 类,以确保您正确构建有效负载

【讨论】:

  • 还是同样的错误`Schema must contain 'type' field`
  • 从这里尝试架构时是否收到相同的消息? stackoverflow.com/a/45940013/2308683 我会移除工会。您可以使用可选键设置可为空的字段
猜你喜欢
  • 2018-01-26
  • 2023-01-01
  • 1970-01-01
  • 2020-06-28
  • 2020-01-01
  • 1970-01-01
  • 2019-07-20
  • 2021-12-03
  • 2020-12-27
相关资源
最近更新 更多