【发布时间】:2020-06-24 21:45:27
【问题描述】:
我有一个主题“橙子”,有 10 个分区,1 个消费者组中有 2 个消费者。我正在使用 Spring Kafka。
由于某种原因,我需要时不时地重新读取数据,我需要重新设置偏移量。我的听众实现了ConsumerSeekAware,并在onPartitionsAssigned() 中调用callback#seekToBeginning。这工作正常,因为在日志中我看到来自 Kafka Client API (2.3.1) 的消息说:
重置分区 oranges-X to offset 0 的偏移量。这适用于所有分区。
但是,实际上只有最后一个分区被重置 (9),如果我幸运的话,有时也会重置第二个 (1)。所有其他人根本没有被重置。
真正让我头疼的是:如果我从要重置的分区列表中省略了分区 9,则所有其他分区都可以正常重置,并且一切都按预期工作。
代码很简单:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
}
...
日志:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
【问题讨论】:
-
你能分享你的代码或一些日志吗?
-
嗨@MickaelMaison,我已经更新了帖子。您想要在上述日志语句之后还是之前的日志?
-
以前从未听说过这样的事情。如果你能提供一个展示这种行为的小而简洁的完整示例,我可以看看。
-
嗨@GaryRussell,感谢您的回复。我将提供一个例子。我已经花了几天时间。我发现只有当我将 AckMode 设置为 BATCH 并将
enable.auto.commit设置为false时才会发生这种情况。如果我将其更改为true,它将按预期工作。看起来有一些待处理的偏移提交?callback#seekToBeginning的文档说Queue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition. -
您使用的是哪个版本?该 javadoc 需要修复;在 1.3 之后的版本中,线程发生了变化(感谢 KIP-62)。
onPartitionsAssigned从poll()调用消费者线程,现在直接完成搜索而不是排队;如果您从registereSeekCallback中保存回调并从onPartitionsAssigned之外调用回调,则搜索仍在排队。
标签: apache-kafka kafka-consumer-api spring-kafka