【问题标题】:How does spring.kafka.consumer.auto-offset-reset works in spring-kafkaspring.kafka.consumer.auto-offset-reset 如何在 spring-kafka 中工作
【发布时间】:2018-03-27 16:40:30
【问题描述】:

KafkaPropertiesjava文档:

/**
  * What to do when there is no initial offset in Kafka or if the current offset
  * does not exist any more on the server.
  */
private String autoOffsetReset;

我有 hello world 应用程序,其中包含 application.properties

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest

在这种情况下,所有条目都会调用@KafkaListener 方法。但预期的结果是 @KafkaListener 方法仅针对我发送的最新 3 个选项调用。我尝试使用另一个选项:

spring.kafka.consumer.auto-offset-reset=earlisest

但行为相同。

你能解释一下这些东西吗?

附言

代码示例:

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);

    @Override
    public void run(String... args) throws Exception {
        this.template.send("spring_kafka_topic", "foo1");
        this.template.send("spring_kafka_topic", "foo2");
        this.template.send("spring_kafka_topic", "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = "spring_kafka_topic")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }
}

更新:

行为不依赖于
spring.kafka.consumer.auto-offset-reset

它只取决于 spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit

如果我设置spring.kafka.consumer.enable-auto-commit=false - 我会看到所有记录。

如果我设置spring.kafka.consumer.enable-auto-commit=true - 我只看到最后 3 条记录。

请澄清spring.kafka.consumer.auto-offset-reset属性的含义

【问题讨论】:

    标签: java spring apache-kafka kafka-producer-api spring-kafka


    【解决方案1】:

    Spring Boot 中的 KafkaProperties 是这样做的:

    public Map<String, Object> buildProperties() {
            Map<String, Object> properties = new HashMap<String, Object>();
            if (this.autoCommitInterval != null) {
                properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                        this.autoCommitInterval);
            }
            if (this.autoOffsetReset != null) {
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                        this.autoOffsetReset);
            }
    

    这个buildProperties()是从buildConsumerProperties()中使用的,而buildConsumerProperties()又是:

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }
    

    因此,如果您使用自己的 ConsumerFactory bean 定义,请务必重用那些 KafkaProperties: https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props

    更新

    好的。我知道发生了什么。

    尝试添加这个属性:

    spring.kafka.consumer.enable-auto-commit=false
    

    这样我们就不会基于某个提交间隔进行异步自动提交。

    我们应用程序中的逻辑基于latch.await(60, TimeUnit.SECONDS); 之后的退出事实。当我们得到3 预期记录时,我们退出。这样,消费者的异步自动提交可能还不会发生。因此,下次您运行应用程序时,消费者会从未提交的偏移量轮询数据。

    当我们关闭自动提交时,我们有一个AckMode.BATCH,它是同步执行的,我们能够看到这个foo 消费者组的主题中真正最新的记录器。

    【讨论】:

    • Artem,我只是用@KafkaListener注解的方法
    • 好。好吧,也许您可​​以与我们分享您的简单 Spring Boot 应用程序,以便在我们这边玩?
    • 当然,已添加。这是文档中的示例
    • 是不是因为我开始consumer和producer一起?
    • 请在我的回答中查看更新
    猜你喜欢
    • 2018-01-27
    • 1970-01-01
    • 2020-09-27
    • 1970-01-01
    • 2018-08-30
    • 2019-03-22
    • 1970-01-01
    • 2017-06-02
    • 2018-12-17
    相关资源
    最近更新 更多