【发布时间】:2021-10-01 16:26:48
【问题描述】:
我正在学习 Kafka。我在 Spring Boot 中创建了一个非常基本的 Kafka 消费者。以下是相关类:
KafkaConfig.java:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id6");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConsumer.java
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group_id6")
public void consume(String message) {
System.out.println("message: " + message);
}
}
我正在使用控制中心检查该主题的消费者并跟踪正在使用的数据。在运行此应用程序时,它与 Kafka 和所有分区连接良好,我可以在控制中心看到所有数据正在获取,但在我的 java 控制台中没有打印任何内容。但我注意到在向 Kafka 发送一些新数据时,它会打印在 Java 控制台中(即在运行我的消费者后将新数据发送到 Kafka)。它应该以这种方式表现吗?还是我做错了什么?根据我的理解,Kafka 主题中的旧消息也应该打印在控制台中。
【问题讨论】:
标签: java apache-kafka kafka-consumer-api spring-kafka