【问题标题】:Getting SerializationException while producing avro records to an avro kafka topic在为 avro kafka 主题生成 avro 记录时获取 SerializationException
【发布时间】:2022-08-04 13:20:46
【问题描述】:

我们有一个 akka-scala 应用程序,我们正在处理传入的消息。之后,我们尝试将该消息写入一个 kafka 主题,该主题是一个 avro kafka 主题。在编写时,我们得到以下异常:

org.apache.kafka.common.errors.SerializationException:注册 Avro 架构时出错 引起:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:内部服务器错误;错误代码:500

我们检查了模式注册表工作正常并且主题和版本存在。我们没有注册任何已经存在的新模式。我们正在使用 scala 2.13.8 并尝试使用不同的融合 kafka avro 序列化程序版本,如 5.1.0、5.2.0、5.3.0、6.1.3。你能告诉这可能是什么原因。

  • 您需要查看注册表服务器日志。显然,如果它有“内部服务器错误”,它就不能正常工作
  • 但是,当我们进入一个 pod 并 curl 到模式注册表端点时,我们得到 200 响应。我们还通过使用来自 pod 的 curl 来检索模式。
  • 然后,您的外部客户端正在传递一些服务器无法处理的无关 HTTP 标头/内容,应从日志中指出。您还可以在其属性中设置debug=true 以获取更多信息。没有这些日志,我们无法回答问题可能是什么
  • 我们发现它正在尝试注册已经存在的模式。从报告的这个问题来看,在这种情况下我们可能会收到 500 错误:github.com/confluentinc/schema-registry/issues/1715 我正在尝试在生产者设置本身中禁用 auto.register.schemas。
  • 是的,生产者总是会尝试注册它的模式。主要是检查兼容性是否在请求之间发生了变化...但是服务器日志仍然会指示真正的错误

标签: scala apache-kafka avro confluent-schema-registry


【解决方案1】:

禁用自动注册模式后,此错误已修复。我们在生产者设置中设置了标志auto.register.schemas = false 并解决了问题。

以下是完整的生产者设置供参考:

object KafkaSinkSettings {
  val BootstrapServers = sys.env.get(Constants.BOOTSTRAP_SERVERS)
  val ProducerConfig = Constants.PRODUCER_CONFIG
  val SchemaRegistryUrl = sys.env.get(Constants.SCHEMA_REGISTRY_URL)
  val Topic = sys.env.get(Constants.TOPIC)
  val ClientId = Constants.METRICS_PREFIX
  val MaxInFlightReqPerConn = sys.env.get(Constants.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
  val RetriesConfig = sys.env.get(Constants.RETRIES_CONFIG)
  val RequestTimeoutMSConfig = sys.env.get(Constants.REQUEST_TIMEOUT_MS_CONFIG)
  val RetryBackoffMsConfig = sys.env.get(Constants.RETRY_BACKOFF_MS_CONFIG)

  def apply(implicit config: Config): KafkaSinkSettings = new KafkaSinkSettings()
}

class KafkaSinkSettings(implicit config: Config) {

  val producerConfig = config.getConfig(KafkaSinkSettings.ProducerConfig)

  val kafkaAvroSerDeConfig = Map[String, Any](
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> KafkaSinkSettings.SchemaRegistryUrl.getOrElse(
      config.getString("kafka.schema-registry-url")),
    AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS -> config.getString("kafka.auto-register-schemas"),

    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> KafkaSinkSettings.BootstrapServers.getOrElse(
      config.getString("kafka.bootstrap-servers")),
    KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
  )


 
  def createAvroProducerSettings(): ProducerSettings[String, AnyRef] = {

    val kafkaAvroSerializer = new KafkaAvroSerializer()
    kafkaAvroSerializer.configure(kafkaAvroSerDeConfig.asJava, false)

    val producerSettings = ProducerSettings(producerConfig, new StringSerializer, kafkaAvroSerializer)
      .withBootstrapServers(KafkaSinkSettings.BootstrapServers.getOrElse(
        config.getString("kafka.bootstrap-servers")))
      .withProperty(ProducerConfig.CLIENT_ID_CONFIG, KafkaSinkSettings.ClientId + randomClientIdPostfix)
      .withProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KafkaSinkSettings.MaxInFlightReqPerConn.getOrElse(
        config.getString("kafka.max-in-flight-requests-per-connection")
      ))
      .withProperty(ProducerConfig.RETRIES_CONFIG, KafkaSinkSettings.RetriesConfig.getOrElse(
        config.getString("kafka.retries-config")
      ))
      .withProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, KafkaSinkSettings.RequestTimeoutMSConfig.getOrElse(
        config.getString("kafka.request-timeout-ms-config")
      ))
      .withProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, KafkaSinkSettings.RetryBackoffMsConfig.getOrElse(
        config.getString("kafka.retry-backoff-ms-config")
      ))
 
    producerSettings
  }

}

【讨论】:

    猜你喜欢
    • 2019-09-10
    • 2018-10-14
    • 2020-07-26
    • 2015-05-01
    • 1970-01-01
    • 2020-08-20
    • 2018-06-20
    • 2018-01-12
    • 2018-10-27
    相关资源
    最近更新 更多