【问题标题】:How to apply spring.cloud.stream.kafka.bindings configuration property to all consumers如何将 spring.cloud.stream.kafka.bindings 配置属性应用于所有消费者
【发布时间】:2018-08-09 17:18:05
【问题描述】:

我能够使用 application.properties 中的以下属性将 partition.assignment.strategy 应用于单通道:

spring.cloud.stream.kafka.bindings.input.consumer.configuration.partition.assignment.strategy

这是根据https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_consumer_properties

我想要实现的是将 partition.assignment.strategy 应用于具有单个属性的所有通道,并避免为所有通道编写相同的内容。

尝试设置 spring.cloud.stream.kafka.binder.configuration 但没有帮助。

也许有人知道如何做到这一点?

其他信息:我使用的是 Spring Cloud 1.3.2.RELEASE。

提前致谢!

【问题讨论】:

  • 我编辑了我刚刚测试过的答案,它对我有用。

标签: java spring spring-boot spring-cloud spring-kafka


【解决方案1】:

我刚刚测试了它,它对我来说很好......

spring.cloud.stream.kafka.binder.configuration.partition.assignment.strategy=\
    org.apache.kafka.clients.consumer.RoundRobinAssignor

@SpringBootApplication
@EnableBinding(Sink.class)
public class So49053074Application {

    public static void main(String[] args) {
        SpringApplication.run(So49053074Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(byte[] in) {

    }

}

2018-03-01 11:01:28.301  INFO 46708 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RoundRobinAssignor]
    receive.buffer.bytes = 65536
    ...

【讨论】:

  • 是的,您的简单示例按预期工作。 spring.cloud.stream.kafka.binder.configuration 似乎在那个简单的例子中工作,但由于某种原因,它不适用于我们更复杂的服务:) 将尝试找出原因......如果你告诉我知道是什么原因造成的。
  • 很奇怪。我想不出复杂性会产生影响的任何原因。
猜你喜欢
  • 2023-03-27
  • 2020-04-10
  • 2022-01-09
  • 1970-01-01
  • 2017-02-14
  • 1970-01-01
  • 2020-09-05
  • 1970-01-01
  • 2019-05-04
相关资源
最近更新 更多