【问题标题】:Kafka Connect - Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table is RECORD_KEY, but record key schema is missingKafka Connect - 原因:org.apache.kafka.connect.errors.ConnectException:表的 PK 模式为 RECORD_KEY,但缺少记录键模式
【发布时间】:2020-07-25 22:00:41
【问题描述】:

我有 jdbc-sink 用于将数据从 Kafka 传输到 Oracle 数据库。

我的连接出现此错误。

Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'orders' is RECORD_KEY, but record key schema is missing

我的接收器属性:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "MESSAGE_KEY",
    "insert.mode": "update ",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

我的 connect-avro-distributed.properties

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

我这样发送数据:

./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "int"}]}' \
--property schema.registry.url=http://10.0.0.0:8081

我该如何解决这个问题?

提前致谢

【问题讨论】:

  • 也分享您的 kafka 连接配置。

标签: oracle jdbc apache-kafka apache-kafka-connect


【解决方案1】:

问题似乎出在您的有效负载和配置"pk.mode": "record_key" 上。

pk.mode 用于定义主键模式,您有以下配置选项:

  • none:未使用任何密钥
  • kafka:使用Kafka坐标作为PK
  • record_key: 使用记录键中的字段,可能是原语或结构。
  • record_value:使用记录值中的字段,必须是结构体。

在您的配置中,您使用的是record_key,这意味着 Kafka Connect 将从消息的键中获取字段并将其用作目标 Oracle 表中的主键。

虽然您还没有分享您的 Kafka Connect 工作人员的配置,但我猜您在其中缺少一些配置参数。

根据documentation

接收器连接器需要架构知识,因此您应该使用 合适的转换器,例如架构附带的 Avro 转换器 注册表或启用了模式的 JSON 转换器。卡夫卡唱片 键(如果存在)可以是原始类型或 Connect 结构,并且 记录值必须是一个 Connect 结构。从中选择的字段 连接结构必须是原始类型。如果主题中的数据 不是兼容的格式,实现自定义Converter 可能 有必要。


现在在您的情况下,问题似乎是"pk.fields",目前设置为"pk.fields": "MESSAGE_KEY"。在您的架构中,消息键定义为id。因此,以下应该可以解决问题:

"pk.fields": "id"

【讨论】:

  • json 数据我这样尝试:{"id":1}${"id": 1, "product": "Yağız Gülbahar", "quantity": 1453, "price": 1453 }
  • @ErsinGülbahar 更新您的问题并包含您的连接配置。;
  • 你说的连接配置是什么意思,我已经把我的 oracle sink 属性放好了
  • @ErsinGülbahar 另外,在你的真实配置中是"pk.fields": "MESSAGE_KEY" 实际上是"pk.fields": "id" 吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-03
  • 2019-02-03
  • 2021-04-13
  • 2022-06-26
  • 1970-01-01
  • 2019-02-19
  • 2022-01-17
相关资源
最近更新 更多