【问题标题】:Control enabling/disabling Kafka consumers in Spring Boot在 Spring Boot 中控制启用/禁用 Kafka 消费者
【发布时间】:2019-06-22 21:52:44
【问题描述】:

我在 Spring Boot 中配置了几个 Kafka 消费者。这就是 kafka.properties 的样子(这里只列出一个消费者的配置):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=

这里是配置:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}

这是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}

有没有办法让我使用道具“kafka.enabled”,以便我可以控制这个消费者的创建或消息检索?非常感谢!

【问题讨论】:

    标签: java spring-boot configuration kafka-consumer-api


    【解决方案1】:

    您可以通过在消费者中使用属性 autoStartup (true/false) 来做到这一点,如下所示 -

    @KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
            containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
    public void consume(String message) {
        //System.out.println("Consumed message: " + message);
    }
    

    【讨论】:

    • autoStartupspring-kafka:2.2 起可用
    • @Bean public ConcurrentKafkaListenerContainerFactory kafkaListener(){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setAutoStartup(false);返厂; }
    • 我个人正在寻找如何在运行时对已启用的 KafkaListener 执行此操作,并发现这很有帮助:github.com/spring-projects/spring-kafka/issues/938
    【解决方案2】:

    要禁用 Kafka 配置,您可以,例如:

    1. 用 KafkaConsumerConfig 注释

      @ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)

    2. 删除KafkaConsumer 类上的@Component 并将其定义为KafkaConsumerConfig 中的@Bean。

    在 KafkaConsumer 中控制消息检索:

    1. 只需在 KafkaConsumer @Value("kafka.enabled") private Boolean enabled; 中获取属性值

    2. 然后在用@KafkaListener注解的方法中使用简单的if。

    【讨论】:

    • 抱歉我的回复晚了。我一直在出差的路上,事情变得疯狂......我想选择选项 1。但我没有遵循选项 1 中的第 2 点。你能更具体一点吗?非常感谢!
    • 我认为他的意思是在您的配置中添加一个属性,然后在您使用消息并检查的方法中读取它: if(enabled) { consume message } 我目前面临条件启用kafka的类似问题消费者并会选择第一个解决方案
    • 嗨@Hua!你可以找到答案here。第 2 点不是严格要求的——它基本上是为了保持 Spring 上下文的清洁。因此,当 Kafka 被禁用时,不会创建以某种方式依赖于 Kafka 的 bean。
    • @Zgurskyi 非常感谢您的帮助!我回到原来的帖子,可以看到我做得不够清楚。我的情况是我有多个消费者,并且希望单独控制每个消费者的创建——一些基于配置文件的打开和其他关闭。您的示例显示了如何控制所有消费者的创建。但我认为这是一个很好的例子,我会看到注释适用于个人消费者!再次感谢!
    • @OleksiiZghurskyi:您提供的链接指向“404”(未找到页面)。很高兴提供预期的链接
    猜你喜欢
    • 2019-05-12
    • 2016-12-09
    • 1970-01-01
    • 1970-01-01
    • 2018-02-18
    • 2022-01-11
    • 1970-01-01
    • 2018-10-03
    • 1970-01-01
    相关资源
    最近更新 更多