任务需要时间;轮询 0 通常意味着轮询将在它发生之前退出。
您应该在 subscribe() 方法中添加一个ConsumerRebalanceListener 回调并在onPartitionsAssigned() 中执行搜索。
编辑
@SpringBootApplication
public class So69121558Application {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@Bean
public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
return args -> {
template.send("so69121558", "test");
Consumer<String, String> consumer = cf.createConsumer("group", "");
consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(System.out::println);
Thread.sleep(5000);
consumer.close();
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
}
这里有几个使用 Spring 方式的示例 - 只需将其中之一(或两者)添加到上述类中即可。
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
搜索的方式也有所不同;这是完整的示例:
@SpringBootApplication
public class So69121558Application extends AbstractConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}