【问题标题】:ConsumerRecords is always empty in Kafka, Java, but Future<RecordMetadata> isDone method result trueConsumerRecords 在 Kafka、Java 中始终为空,但 Future<RecordMetadata> isDone 方法结果为 true
【发布时间】:2021-08-27 14:10:52
【问题描述】:

我尝试将消息发送到确切的 Kafka 主题,然后从中接收消息。我有消费者、生产者和主题的 3 个配置类:

public class KafkaTopicConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        return new KafkaAdmin(configs);
    }
}
public class KafkaConsumerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public Consumer<String, String> consumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                globalConfiguration.kafka().bootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                globalConfiguration.kafka().groupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        return new KafkaConsumer<>(props);
    }
}
public class KafkaProducerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(configProps);
    }
}

GlobalConfiguration 类存储我的所有属性。对于卡夫卡:

bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"

那我就这样发消息

    private void sendMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
        kafkaProducer.flush();
        boolean isSent = sendResponse.isDone();
    } 

我检查是否使用 sendResponse.isDone() 发送了消息,它返回 true。 但后来我尝试接收消息:

protected String receiveMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> consumerRecords =
                consumer.poll(Duration.ofMillis(10000));
        consumerRecords.isEmpty();
    }

而且 ConsumerRecords 总是空的。可能是什么问题?

【问题讨论】:

  • 尝试从命令行检查 kafka 主题。 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning for linux.-----bin/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning for windows
  • 执行上述行后,您应该会收到这些消息。检查并请回复

标签: java spring-boot apache-kafka


【解决方案1】:

如果你想消费之前发送的记录,而不是轮询你启动消费者后10秒内发送的记录,你需要添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
            OffsetResetStrategy.EARLIEST.name().toLowerCase());

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-05-09
    • 2020-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多