【问题标题】:How to set a seperate bootstrap server to a DLT of a binding如何将单独的引导服务器设置为绑定的 DLT
【发布时间】:2021-12-05 05:54:15
【问题描述】:

我需要将消费的无效消息发送到 DLT,但在单独的引导服务器中。我目前有这个配置:

spring.cloud.stream.binders.some-kafka-binder.type=kafka
spring.cloud.stream.binders.some-kafka-binder.environment.spring.cloud.stream.kafka.binder.brokers=localhost:29092

spring.cloud.stream.bindings.processor-in-0.binder=some-kafka-binder
spring.cloud.stream.bindings.processor-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.processor-in-0.destination=outbound-topic
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.schema.registry.url=${schema.registry.url}
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.configuration.specific.avro.reader=true
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-name=outbound-topic.DLT
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.schema.registry.url=${schema.registry.url}
spring.cloud.stream.kafka.bindings.processor-in-0.consumer.dlq-producer-properties.configuration.bootstrap.servers=localhost:9092

...但我收到此错误:

Caused by: java.lang.IllegalStateException: bootstrap.servers cannot be overridden at the binding level; use multiple binders instead
        at org.springframework.util.Assert.state(Assert.java:76)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getProducerFactory(KafkaMessageChannelBinder.java:560)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getErrorMessageHandler(KafkaMessageChannelBinder.java:1148)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getErrorMessageHandler(KafkaMessageChannelBinder.java:158)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:695)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:734)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:158)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408)
        ... 27 common frames omitted

有没有办法在 spring cloud 中进行设置?我真的希望我不需要进行自定义 DLT 实施。

如果您问为什么需要为 DLT 设置另一个引导程序:涉及 2 个单独的 AWS KMS 账户。

【问题讨论】:

    标签: spring-boot apache-kafka spring-cloud spring-kafka spring-cloud-stream


    【解决方案1】:

    启用 DLT 时,目标需要与输入主题位于同一 Kafka 集群上。目前,binder 没有提供一种方法来为 DLT 单独使用不同的集群。如果您愿意创建一个额外的主题,我认为您可以做一个解决方法。让 DLT 在第一个集群上,这样当发生错误时,它会转到数据来自的同一集群上的 DLT。然后,您可以定义另一个直通函数,其中 DLT 是输入主题,输出主题配置有第二个集群的 binder。

    @Bean
    public Consumer<IN> consumer() {
    
    // records in error go to a DLT.
    }
    
    // input is the DLT topic from first cluster
    // output is a topic on the second cluster.
    @Bean
    public Function<IN, OUT> function() {
    
      return in -> out;
    
    }
    
    spring.cloud.stream.binders.some-kafka-binder-1.type=kafka
    spring.cloud.stream.binders.some-kafka-binder-1.environment.spring.cloud.stream.kafka.binder.brokers=localhost:29092
    spring.cloud.stream.binders.some-kafka-binder-2.type=kafka
    spring.cloud.stream.binders.some-kafka-binder-2.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
    

    另外,从 DLT 配置中删除引导配置。

    随时在 GitHub 存储库中为 Kafka binder 添加新功能请求。我们可以考虑使用StreamBridge API 来适应这个用例。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-06
    • 2021-05-24
    • 1970-01-01
    • 1970-01-01
    • 2019-11-24
    • 1970-01-01
    相关资源
    最近更新 更多