【问题标题】:Kafka Topic ordering with Spring Kafka使用 Spring Kafka 进行 Kafka 主题排序
【发布时间】:2019-06-17 00:21:39
【问题描述】:

我正在尝试建立一个系统,该系统将从两个不同的 Kafka 主题中读取 - 一个用于实时消息,一个用于批量消息。希望是,无论“批量”主题有多少消息,“实时”主题的任何内容都会被优先考虑。

似乎Spring Kafka 开箱即用 - 有时。

我得到的只是:

@KafkaListener(topics = {"sync-live", "sync-bulk"}, concurrency = "1")

我的配置是:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

有时这会完全符合我的要求,有时则不会。事实上,有几次我看到它从“实时”主题切换到“批量”主题,但仍有待处理的实时消息!

有没有办法告诉 Spring Kafka 在第一个主题为空时只读取第二个主题?

干杯

【问题讨论】:

    标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    这与 Spring 无关,消费者订阅的主题的消息分发是代理的功能。

    问题是您对两个主题都使用单个使用者(侦听器容器)。

    要为每个消费者获取专用的消费者,请使用多个注释...

    @KafkaListener(groupId = "live.group", topics = "sync-live", concurrency = "1")
    @KafkaListener(groupId = "bulk.group", topics = "sync-bulk", concurrency = "1")
    public synchronized void listen(...) { ... }
    

    synchronized 将阻止两个容器同时调用监听器。如果这不是问题,请忽略它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-27
      • 1970-01-01
      • 2016-12-03
      • 1970-01-01
      • 2021-10-23
      • 2019-11-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多