【问题标题】:confluent kafka avro producer schema error融合 kafka avro 生产者模式错误
【发布时间】:2020-11-09 08:58:01
【问题描述】:

我正在使用来自https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py 的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了 "default": null 到每个字段以实现架构兼容性。它可以很好地加载,因为我可以在 http://localhost:9021/ 中看到消息和架构。如果我通过 cli 运行 kafka-avro-console-consumer 命令,我还可以看到进入主题的数据。 但是尝试使用带有https://docs.confluent.io/current/connect/kafka-connect-aws-redshift/index.html 中提供的配置的redshift sink,我收到以下错误,如下所示。但是,如果我不在字段中添加 "default": null ,那么它会一直运行到最后。任何指导将不胜感激。

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1812)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1567)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1543)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:108)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: INT32
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

【问题讨论】:

    标签: apache-kafka avro apache-kafka-connect confluent-schema-registry


    【解决方案1】:

    添加“default:null”是不够的,需要修改类型为:

    type: ["null", "string"], default: null
    

    注意将“null”添加到第一个位置的类型,即。 不是

    type: ["string", "null"], default: null
    

    见讨论: http://apache-avro.679487.n3.nabble.com/How-to-declare-an-optional-field-tp4025089p4025094.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-24
      • 1970-01-01
      • 2021-12-18
      • 1970-01-01
      • 1970-01-01
      • 2021-09-12
      相关资源
      最近更新 更多