【问题标题】:Confluent Kafka Avro Serializer and Spring CloudConfluent Kafka Avro 序列化器和 Spring Cloud
【发布时间】:2019-04-04 22:15:10
【问题描述】:

我正在尝试使用 Spring Cloud 和 Kafka Avro Serializer 在 Kafka 上生成一个事件。

在我的 application.yml 中,我有以下配置,但是当 Serializer 尝试生成消息时,它生成为字节,因为传递给 KafkaSerializer 中的 getScheme 方法的对象是字节数组,它不是 GenericRecord。我认为我需要 Spring Cloud 中的特定 MessageConverter,但我没有找到。

cloud:
stream:
  kafka:
    binder:
      brokers:
        - 'localhost:9092'
      useNativeDecoding: true
    bindings:
      Ptr-output:
          producer:
            configuration:
              schema.registry.url: 'http://localhost:8081'
              key.serializer: org.apache.kafka.common.serialization.StringSerializer
              value.serializer: com.abc.message.ptr.KafkaSerializer
  schemaRegistryClient:
    endpoint: 'http://localhost:8081'
  bindings:
     Ptr-output:
      contentType: application/*+avro
      destination: Ptr
  schema:
    avro:
      schema-locations: 'classpath:avro/Ptr.avsc'
      dynamic-schema-generation-enabled: false

我该怎么办?我该如何解决?

【问题讨论】:

    标签: apache-kafka spring-cloud avro spring-cloud-stream confluent-platform


    【解决方案1】:

    尝试将序列化程序设置为io.confluent.kafka.serializers.KafkaAvroDeserializer

    【讨论】:

      【解决方案2】:

      参见the documentation,特别是需要设置生产者属性useNativeEncoding

      使用NativeEncoding

      设置为true时,出站消息由客户端库直接序列化,需要相应配置(例如设置合适的Kafka生产者值序列化器)。使用此配置时,出站消息编组不基于绑定的 contentType。当使用本机编码时,消费者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)来反序列化入站消息。此外,当使用本机编码和解码时,headerMode=embeddedHeaders 属性将被忽略,并且标头不会嵌入到消息中。请参阅消费者属性 useNativeDecoding。

      【讨论】:

        猜你喜欢
        • 2019-11-18
        • 1970-01-01
        • 1970-01-01
        • 2019-07-30
        • 2020-10-15
        • 2018-01-31
        • 2020-03-20
        • 2020-10-05
        • 1970-01-01
        相关资源
        最近更新 更多