【问题标题】:Kafka Sink Connector fails: Schema not found; error code: 40403Kafka Sink 连接器失败:找不到架构;错误代码:40403
【发布时间】:2019-06-11 08:10:06
【问题描述】:

我有一个具有以下配置的接收器连接器

{
    "name": "sink-test-mariadb-MY_TOPIC",
    "config": { 
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max":"10",
                "topics":"MY_TOPIC",
                "connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
                "auto.create":"false",
                "auto.evolve":"true",
                "table.name.format":"MY_TABLE",
                "pk.mode":"record_value",
                "pk.fields":"ID",
                "insert.mode":"upsert",
                "transforms":"ExtractField",
                "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
                "transforms.ExtractField.field":"data"
        }
}

一段时间后,连接器的所有任务都失败并出现以下错误:

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
            Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
            Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
                at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)",
    "id": 0,
    "worker_id": "localhost:8083"
}

连接器设法将主题与数据库同步,但它突然无缘无故地失败了。我也很确定架构在那里。它的主题出现在调用模式注册APIlocalhost:8081/subjects返回的列表中

[
  ...
  MY_TOPIC-value
  ...
]

【问题讨论】:

  • GET /schemas/ids/803 的输出是什么? (ref)
  • @RobinMoffatt { "error_code": 40403, "message": "Schema not found" }
  • curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/" 怎么样?
  • @RobinMoffatt 感谢您的及时回复。我得到的回复是[ 3 ]

标签: apache-kafka avro confluent-schema-registry


【解决方案1】:

Kafka 主题上的消息使用与架构注册表中不同版本的架构进行序列化。也许它是由将模式写入不同模式注册表或在不同环境中的工具生成的?为了能够反序列化它,Kafka Connect 需要能够检索该主题的 Kafka 消息开头的魔术字节中的模式 ID。

架构不存在于您的架构注册表中,如下所示:

GET /schemas/ids/803
 { "error_code": 40403, "message": "Schema not found" }

您可以通过查看

来检查您所拥有的架构的 ID
curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/3/"|jq '.id'

【讨论】:

  • 感谢您的回答!我明白你的意思,但在我的情况下它没有意义。我昨天遇到了同样的问题,为了解决这个问题,我停止了连接器,删除了主题和主题。然后我重新创建了主题并在其中推送了相同的消息,然后启动了接收器连接器,它工作正常,直到今天出现了同样的错误。
  • 我的赌注是在其他地方写同一主题的另一个进程。
  • @GiorgosMyrianthous 还要确保 key.converter 未设置为 Avro 并尝试自己进行 ID 查找(假设您有非空键)
  • @cricket_007 我很确定密钥是字符串格式,因为我正在使用第三方工具来生成主题中的数据(无论如何都不支持 avro 密钥)
  • @GiorgosMyrianthous 好的。我的观点是Avro schema for id 802 似乎相当大,如果您的主题数量较少并且假设 ID 是连续的。可能发生的情况是 AvroConverter 看到您的 String 键以 Avro Magic 字节 (0x0) 开头,然后检查接下来的四个 UTF-8 字节,并解析为 802 的 int。如果您使用 String 或 ByteArray 转换器,则不会发生这种情况
【解决方案2】:

我遇到了同样的问题,我意识到代码 40403 并不意味着没有找到架构,它意味着架构与所需的架构不对应。如果根本找不到架构 (40401),则存在不同的代码。

所以我所做的只是相应地更改架构,它对我有用。

【讨论】:

    猜你喜欢
    • 2019-01-21
    • 2018-06-13
    • 1970-01-01
    • 2019-06-22
    • 2022-05-31
    • 2019-12-20
    • 2017-03-26
    • 2022-09-24
    • 2014-07-12
    相关资源
    最近更新 更多