【问题标题】:Spring Cloud Streams kafka binder - topic serialization configurationSpring Cloud Streams kafka binder - 主题序列化配置
【发布时间】:2021-02-24 15:28:56
【问题描述】:

所以我认为我自己陷入了困惑,因为我知道 SpringCloudStreams 有两种不同的 kafka 绑定器:

  • Spring Cloud Streams Kafka Binder
  • Spring Cloud Streams Kafka Streams Binder

我正在寻找正确的 YAML 设置,以在 Spring Cloud 流的普通 kafka 绑定器中定义序列化器和反序列化器:

我可以使用这个逻辑调整默认值:

spring:
  main:
    web-application-type: NONE
  application:
    name: tbfm-translator
  kafka:
    consumer:
      group-id: ${consumer_id}
    bootstrap-servers: ${kafka_servers}
  cloud:
    schemaRegistryClient:
      endpoint: ${schema_registry}
    stream:
#      default:
#        producer.useNativeEncoding: true
#        consumer.useNativeEncoding: true
      defaultBinder: kafka
      kafka:
        binder:
          auto-add-partitions: true # I wonder if its cause this is set
          auto-create-topics: true # Disabling this seem to override the server setings and will auto create

          producer-properties:
            # For additional properties you can check here:
            # https://docs.confluent.io/current/installation/configuration/producer-configs.html

            schema.registry.url: ${schema_registry}

            # Disable for auto schema registration
            auto.register.schemas: false

            # Use only the latest schema version
            use.latest.version: true

            # This will use reflection to generate schemas from classes - used to validate current data set
            # against the scheam registry for valid production
            schema.reflection: true

            # To use an avro key enable the following line
            #key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

            #This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer

            # This will control the Serializer Setup
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

这是:

spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer

我认为我应该能够在每个主题的基础上做到这一点:


spring:
  cloud:
    stream:
      bindings:
        my-topic:
          destination: a-topic
          xxxxxxxx??

我遇到过设置:

          producer:
            use-native-encoding: false
            keySerde: <CLASS>

但这似乎不起作用。我可以设置一个简单的属性来按主题执行此操作吗?我认为keySerde 用于 Kafka-streams 实现,而不是普通的 kafka binder。

【问题讨论】:

    标签: spring spring-boot apache-kafka spring-kafka spring-cloud-stream


    【解决方案1】:

    use-native-encoding 必须是 true 才能使用您自己的序列化程序。

    spring.cloud.stream.kafka.bindings.my-topic.producer.configuration.value.serializer: ...

    the documentation for kafka-specific producer properties

    配置

    使用包含通用 Kafka 生产者属性的键/值对映射。

    默认值:空地图。

    【讨论】:

      【解决方案2】:
      
      
          stream:
            bindings: # Define output topics here and then again in the kafka.bindings section
              test:
                destination: multi-output
                producer:
                  useNativeDecoding: true
      
            kafka:
              bindings:
                test:
                  destination: multi-output
                  producer:
                    configuration:
                      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      

      这似乎可行 - 但很烦人,我必须在两个地方复制绑定定义

      让我们想要回避 YAML 样式定义

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-03-16
        • 2022-11-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-06-03
        • 2022-12-10
        • 1970-01-01
        相关资源
        最近更新 更多