【问题标题】:How do I seek to end of a kafka topic when I am creating a new consumer in an existing consumer group with akka streams?当我在具有 akka 流的现有消费者组中创建新消费者时,如何寻求结束 kafka 主题?
【发布时间】:2019-11-06 08:33:02
【问题描述】:

我有一个 akka 应用程序(在 JAVA 中),它使用 commitablePartitionedSource 来使用来自 kafka 主题的消息。我有一些消费者群体可以为多个主题吸引消费者。这是由动态配置驱动的,我可以在其中暂时关闭消费者,并可能在稍后重新启动它们。

当这个消费者重新启动时,我只想阅读新消息,而不是从我离开的地方开始。

有没有办法从 akka-alpakka 消费者那里获取 kafkaConsumer 对象,以便在处理之前可以 seekToEnd() ? 请让我知道是否有其他方法可以实现这一目标?也许使用 akka 配置或不同类型的消费者?我不想维护自己的偏移量(希望不是唯一的选择)

我的配置设置为在我启动消费者组时获取 latest 偏移量,但由于我正在关闭并重新启动单个消费者,它总是从我离开的地方开始消费。

我尝试为一个主题创建一个消费者组,但我有很多主题,而且结果非常耗费资源。我还寻找一种方法来清除存储在 kafka 中的该主题的偏移量,但未成功。

【问题讨论】:

    标签: java scala apache-kafka akka akka-stream


    【解决方案1】:

    最简单的方法就是在每次启动消费者时创建一个新的消费者组。 Kafka 将在可配置的时间 (retention.ms) 后删除陈旧的消费者组

    如果您很少重新启动消费者并且总是希望它处理新数据而不是赶上所有丢失的消息,则此策略很好。

    编辑

    据我所知,访问底层 KafkaConsumer 的唯一方法是使用 committableExternalSource。这样您就可以访问seekToEnd 方法,但是您还需要注意订阅提供每个分区的起始偏移量的主题(类似于您现在设置committablePartitionedSource 但在外部阿卡)。

    【讨论】:

    • 感谢您的回复。遗憾的是,这种方法需要太多消费者。
    【解决方案2】:

    commitablePartitionedSourceAutoSubscription 作为输入,您不能指定偏移量。

    你需要的是一个接受ManualSubscription或更高级别Subscription的方法,比如

    > plainExternalSource
    > committableExternalSource
    > plainSource
    ...
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-19
      相关资源
      最近更新 更多