【问题标题】:Spring integration Kafka and managing offsetsSpring集成Kafka和管理偏移量
【发布时间】:2014-08-08 19:51:15
【问题描述】:

我正在使用 Spring Integration Kafka 扩展来读取和处理来自 Kafka in Java 应用程序的消息。据我所知,它使用不允许在 Zookeeper 中完全管理偏移量的高级消费者 API。

在我的例子中,我们有 auto.commit.enable=false 以便在处理消息后向 Zookeeper 提交偏移量。如果处理失败,则不会提交偏移量,我们应该尝试在某个配置的时间从 Zookeeper 的偏移量开始再次处理相同的消息。但它不起作用,因为我认为 Apache Kafka 客户端在内存中保持偏移。

我发现 kafka.consumer.ConsumerIterator 处理偏移量,如果其中的 consumedOffset 大于 Zookeeper 中的值,那么它将使用已消耗的值读取消息。

那么,我想知道有什么方法可以重置 Kafka 客户端中的偏移量以从 Zookeeper 中的偏移量开始读取?

【问题讨论】:

    标签: java spring spring-integration apache-kafka


    【解决方案1】:

    您不介意分享您的<int-kafka> 配置并指出您的问题所在吗?

    也许这样做就足够了MessageLeftOverTracker.clearMessagesLeftOver()

    我不熟悉 Kafka,但知道 Spring Integration 的作用。

    【讨论】:

      猜你喜欢
      • 2017-07-17
      • 1970-01-01
      • 1970-01-01
      • 2018-09-22
      • 2017-06-14
      • 2021-11-15
      • 2021-01-15
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多