【发布时间】:2021-08-27 14:10:52
【问题描述】:
我尝试将消息发送到确切的 Kafka 主题,然后从中接收消息。我有消费者、生产者和主题的 3 个配置类:
public class KafkaTopicConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
return new KafkaAdmin(configs);
}
}
public class KafkaConsumerConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public Consumer<String, String> consumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
globalConfiguration.kafka().bootstrapAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
globalConfiguration.kafka().groupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
}
public class KafkaProducerConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(configProps);
}
}
GlobalConfiguration 类存储我的所有属性。对于卡夫卡:
bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"
那我就这样发消息
private void sendMessage(final String topic, final String message) {
kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
kafkaProducer.flush();
boolean isSent = sendResponse.isDone();
}
我检查是否使用 sendResponse.isDone() 发送了消息,它返回 true。 但后来我尝试接收消息:
protected String receiveMessage(final String topic, final String message) {
kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(10000));
consumerRecords.isEmpty();
}
而且 ConsumerRecords 总是空的。可能是什么问题?
【问题讨论】:
-
尝试从命令行检查 kafka 主题。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningfor linux.-----bin/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginningfor windows -
执行上述行后,您应该会收到这些消息。检查并请回复
标签: java spring-boot apache-kafka