【发布时间】:2014-08-08 19:51:15
【问题描述】:
我正在使用 Spring Integration Kafka 扩展来读取和处理来自 Kafka in Java 应用程序的消息。据我所知,它使用不允许在 Zookeeper 中完全管理偏移量的高级消费者 API。
在我的例子中,我们有 auto.commit.enable=false 以便在处理消息后向 Zookeeper 提交偏移量。如果处理失败,则不会提交偏移量,我们应该尝试在某个配置的时间从 Zookeeper 的偏移量开始再次处理相同的消息。但它不起作用,因为我认为 Apache Kafka 客户端在内存中保持偏移。
我发现 kafka.consumer.ConsumerIterator 处理偏移量,如果其中的 consumedOffset 大于 Zookeeper 中的值,那么它将使用已消耗的值读取消息。
那么,我想知道有什么方法可以重置 Kafka 客户端中的偏移量以从 Zookeeper 中的偏移量开始读取?
【问题讨论】:
标签: java spring spring-integration apache-kafka