【问题标题】:How to pass dynamic topic name to @KafkaListener(topics) from environment variable如何从环境变量将动态主题名称传递给@KafkaListener(topics)
【发布时间】:2019-05-31 00:35:41
【问题描述】:

我正在编写一个 Kafka 消费者。我需要将环境变量主题名称传递给@KafkaListener(topics = ...)。这是我迄今为止尝试过的:

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener; 
 import org.springframework.stereotype.Service;

 @Service
 public class KafkaConsumer {

     @Autowired
     private EnvProperties envProperties;

     private final String topic = envProperties.getTopic();

     @KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
     public void consume(String message) {
        logger.info("Consuming messages " +envProperties.getTopic());
     }
}

topics = "#{'${envProperties.getTopic()}'}" 行出现错误,应用程序无法启动。

如何从环境变量中动态设置这个主题名称?

【问题讨论】:

    标签: java spring-boot kafka-consumer-api spring-kafka spring-el


    【解决方案1】:

    在 KafkaConsumer 类中,您需要进行以下更改:

    @Autowired
    public EnvProperties envProperties;
    
    @KafkaListener(topics = "#{kafkaConsumer.envProperties.getTopic()}"
    

    它对我有用。

    【讨论】:

      【解决方案2】:

      通常,您不能从声明 SpEL 的 bean 中引用字段或属性。但是,@KafkaListener 有特殊的语法来支持它。

      See the documentation.

      从 2.1.2 版开始,SpEL 表达式支持特殊标记 __listener,它是一个伪 bean 名称,表示存在此注释的当前 bean 实例。

      所以,如果你将public EnvProperties getEnvProperties() 添加到类中,那么类似

      #{__listener.envProperties.topic}
      

      应该可以。

      【讨论】:

        猜你喜欢
        • 2020-01-17
        • 1970-01-01
        • 2018-01-08
        • 2020-10-23
        • 2013-12-29
        • 2016-07-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多