【问题标题】:Kafka - Resetting offset for partition does not workKafka - 重置分区的偏移量不起作用
【发布时间】:2020-06-24 21:45:27
【问题描述】:

我有一个主题“橙子”,有 10 个分区,1 个消费者组中有 2 个消费者。我正在使用 Spring Kafka。

由于某种原因,我需要时不时地重新读取数据,我需要重新设置偏移量。我的听众实现了ConsumerSeekAware,并在onPartitionsAssigned() 中调用callback#seekToBeginning。这工作正常,因为在日志中我看到来自 Kafka Client API (2.3.1) 的消息说:

重置分区 oranges-X to offset 0 的偏移量。这适用于所有分区。

但是,实际上只有最后一个分区被重置 (9),如果我幸运的话,有时也会重置第二个 (1)。所有其他人根本没有被重置。

真正让我头疼的是:如果我从要重置的分区列表中省略了分区 9,则所有其他分区都可以正常重置,并且一切都按预期工作。

代码很简单:

class ... implements ConsumerSeekAware {
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
        callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

日志:

19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.

【问题讨论】:

  • 你能分享你的代码或一些日志吗?
  • 嗨@MickaelMaison,我已经更新了帖子。您想要在上述日志语句之后还是之前的日志?
  • 以前从未听说过这样的事情。如果你能提供一个展示这种行为的小而简洁的完整示例,我可以看看。
  • 嗨@GaryRussell,感谢您的回复。我将提供一个例子。我已经花了几天时间。我发现只有当我将 AckMode 设置为 BATCH 并将 enable.auto.commit 设置为 false 时才会发生这种情况。如果我将其更改为true,它将按预期工作。看起来有一些待处理的偏移提交? callback#seekToBeginning 的文档说 Queue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition.
  • 您使用的是哪个版本?该 javadoc 需要修复;在 1.3 之后的版本中,线程发生了变化(感谢 KIP-62)。 onPartitionsAssignedpoll() 调用消费者线程,现在直接完成搜索而不是排队;如果您从registereSeekCallback 中保存回调并从onPartitionsAssigned 之外调用回调,则搜索仍在排队。

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

我无法重现您的问题。

这是我的测试 Spring Boot 应用程序:

@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {


    private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62465345Application.class, args);
    }

    @KafkaListener(id = "so62465345", topics = "so62465345")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
                System.currentTimeMillis() + ":foo:" + i));
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        try {
            Thread.sleep(5000);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        LOG.info("Seeking on assignment");
        callback.seekToBeginning(assignments.keySet());
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        LOG.info("Seeking on idle");
        callback.seekToBeginning(assignments.keySet());
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000

我在onIdleContainer 中设置了一个断点,并且使用kafka-console-consumer,我看到偏移量直到下一个poll() 才真正重置。

Seeking to EARLIEST offset of partition so62465345-1 在我们执行搜索时出现,但 Resetting offset for partition so62465345-0 to offset 0 直到我们再次调用 poll() 时才会出现(然后实际重置偏移量)。

所以我确实看到当前轮询不会发生搜索,它返回 0 条记录,但下一个轮询从头开始。

【讨论】:

  • 嗨,加里,感谢您的提醒。是的,我同意 - 它似乎与 Spring 无关。希望你比我幸运,我似乎没有找到解决这个问题的方法。期待您的回音。
  • 更正;我的空闲时间太短了,以至于 seek 掩盖了 seek on assignment 发生的事情;不过,我确实看到搜索直到下一次民意调查才会生效。
  • 这很奇怪。在我的测试中,甚至下一次民意调查都没有帮助。它永远不会移动偏移量。
猜你喜欢
  • 2019-07-20
  • 1970-01-01
  • 1970-01-01
  • 2018-10-08
  • 2019-11-25
  • 2018-05-30
  • 1970-01-01
  • 2021-02-22
相关资源
最近更新 更多