【问题标题】:Kafka Consumer not printing the older messages in consoleKafka Consumer 不在控制台中打印旧消息
【发布时间】:2021-10-01 16:26:48
【问题描述】:

我正在学习 Kafka。我在 Spring Boot 中创建了一个非常基本的 Kafka 消费者。以下是相关类:

KafkaConfig.java:

@EnableKafka 
@Configuration
public class KafkaConfig {
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id6");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(config);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
    
}

KafkaConsumer.java

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test", groupId = "group_id6")
    public void consume(String message) {
        System.out.println("message: " + message);
    }
}

我正在使用控制中心检查该主题的消费者并跟踪正在使用的数据。在运行此应用程序时,它与 Kafka 和所有分区连接良好,我可以在控制中心看到所有数据正在获取,但在我的 java 控制台中没有打印任何内容。但我注意到在向 Kafka 发送一些新数据时,它会打印在 Java 控制台中(即在运行我的消费者后将新数据发送到 Kafka)。它应该以这种方式表现吗?还是我做错了什么?根据我的理解,Kafka 主题中的旧消息也应该打印在控制台中。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    如果消费者再次启动时已经存在消费者组,它将从它在偏移量(或位置)中停止的位置继续,因为该偏移量由 Kafka 和/或 ZooKeeper 存储。

    另一方面,如果在现有主题中启动新的消费者组,则没有偏移存储。在这种情况下,偏移量要么从主题的开头开始,要么从主题的结尾开始。

    基本上你应该要么创建一个新的消费者组并将其配置为从最小的偏移量开始,要么重置当前消费者组的偏移量。

    【讨论】:

    • 是的,它是一个现有主题的新消费者组。所以你是说当我为一个现有的主题创建一个新的消费者组时,我需要做一些配置,让偏移量从主题的开头开始?我的印象是一个新的消费者组总是从一个主题的偏移量 0 开始,即使它之前不存在。
    • 您可以使用 Kafka API 来配置您的消费者并分配手动偏移量。例如,您可以使用来自 KafkaConsumer 的 seekToBeginning。
    • 您需要将auto.offset.reset 设置为earliest 以便新组获取旧消息。它的默认值为latest
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-20
    相关资源
    最近更新 更多