【发布时间】:2021-02-24 15:28:56
【问题描述】:
所以我认为我自己陷入了困惑,因为我知道 SpringCloudStreams 有两种不同的 kafka 绑定器:
- Spring Cloud Streams Kafka Binder
- Spring Cloud Streams Kafka Streams Binder
我正在寻找正确的 YAML 设置,以在 Spring Cloud 流的普通 kafka 绑定器中定义序列化器和反序列化器:
我可以使用这个逻辑调整默认值:
spring:
main:
web-application-type: NONE
application:
name: tbfm-translator
kafka:
consumer:
group-id: ${consumer_id}
bootstrap-servers: ${kafka_servers}
cloud:
schemaRegistryClient:
endpoint: ${schema_registry}
stream:
# default:
# producer.useNativeEncoding: true
# consumer.useNativeEncoding: true
defaultBinder: kafka
kafka:
binder:
auto-add-partitions: true # I wonder if its cause this is set
auto-create-topics: true # Disabling this seem to override the server setings and will auto create
producer-properties:
# For additional properties you can check here:
# https://docs.confluent.io/current/installation/configuration/producer-configs.html
schema.registry.url: ${schema_registry}
# Disable for auto schema registration
auto.register.schemas: false
# Use only the latest schema version
use.latest.version: true
# This will use reflection to generate schemas from classes - used to validate current data set
# against the scheam registry for valid production
schema.reflection: true
# To use an avro key enable the following line
#key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
#This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# This will control the Serializer Setup
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
这是:
spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer
我认为我应该能够在每个主题的基础上做到这一点:
spring:
cloud:
stream:
bindings:
my-topic:
destination: a-topic
xxxxxxxx??
我遇到过设置:
producer:
use-native-encoding: false
keySerde: <CLASS>
但这似乎不起作用。我可以设置一个简单的属性来按主题执行此操作吗?我认为keySerde 用于 Kafka-streams 实现,而不是普通的 kafka binder。
【问题讨论】:
标签: spring spring-boot apache-kafka spring-kafka spring-cloud-stream