【发布时间】:2018-03-27 16:40:30
【问题描述】:
KafkaPropertiesjava文档:
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
我有 hello world 应用程序,其中包含 application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest
在这种情况下,所有条目都会调用@KafkaListener 方法。但预期的结果是 @KafkaListener 方法仅针对我发送的最新 3 个选项调用。我尝试使用另一个选项:
spring.kafka.consumer.auto-offset-reset=earlisest
但行为相同。
你能解释一下这些东西吗?
附言
代码示例:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("spring_kafka_topic", "foo1");
this.template.send("spring_kafka_topic", "foo2");
this.template.send("spring_kafka_topic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "spring_kafka_topic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
更新:
行为不依赖于spring.kafka.consumer.auto-offset-reset
它只取决于 spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit
如果我设置spring.kafka.consumer.enable-auto-commit=false - 我会看到所有记录。
如果我设置spring.kafka.consumer.enable-auto-commit=true - 我只看到最后 3 条记录。
请澄清spring.kafka.consumer.auto-offset-reset属性的含义
【问题讨论】:
标签: java spring apache-kafka kafka-producer-api spring-kafka