【问题标题】:Seek to an offset via an external trigger通过外部触发器寻找偏移量
【发布时间】:2019-04-26 12:03:05
【问题描述】:

目前我使用 AcknoledgeingMessageListener 来实现一个使用 spring-Kafka 的 Kafka 消费者。这个实现帮助我收听特定主题并使用手动确认处理消息。 我现在需要构建以下功能: 让我们假设,对于某个环境异常或通过该主题输入的一些不良数据,我需要从特定偏移量重放某个主题的数据。这将是手动触发(主要是通过执行 Java 类)。

如果我可以检索这些偏移量之间的消息并将其作为重播主题提供,这样新的消费者可以处理这些消息,从而保持原始主题上的偏移量不变,那将是理想的。

  1. CosumerSeekAware 接口 - 如果这是答案,我如何从外部触发它?通过让我们说一个 mvn -Dexec。我不确定这是否可能
  2. 另外假设我有一个崩溃时间戳,是否可以自省主题以找到与崩溃对应的偏移量,以便我可以从该偏移量重播?
  3. 我能否找到与某些特定数据相对应的偏移量,以便我可以重放这些特定的偏移量?

所有这些要求都是围绕我们的 Kafka 功能构建弹性层。我需要所有这些都由一个单独的可执行类管理,该类可以手动触发,提供相关数据(如时间戳等)。此类应确定偏移量,然后寻找该偏移量,检索与这些偏移量对应的消息并将它们发布到单独的主题。有人可以指出我正确的方向吗?恐怕我要绕圈子了。

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    以便新的消费者可以处理这些消息,从而保持原始主题上的偏移量不变。

    只需创建一个具有不同组 ID(新使用者)的新侦听器容器,并在分配分区时使用 ConsumerAwareRebalanceListener(或 ConsumerSeekAware)执行搜索。

    Here is a sample CARL that seeks all assigned topics based on a timestamp.

    您将需要一些机制来了解新消费者何时应该停止消费(此时您可以stop() 新容器)。也许在新消费者上设置max.poll.records=1,这样他就不会预取超过故障点。

    我不确定你所说的#3 是什么意思。

    【讨论】:

    • 1.是否可以运行新类(实现 ConsumerAwareRebalanceListener 或 ConsumerSeekAware 或者当您指的是容器时,它是一个单独的 mvn spring-boot:run?2。如果我知道结束偏移量有没有办法关闭容器?还有什么时候我调出这个容器,我怎样才能为其提供起始偏移量?3. 在#3 中 - 假设我只想处理那些具有特定 ID 的消息。所以我读取从偏移量开始的消息,并且只处理具有此 ID 4. 放入单独主题的策略对于实际重放数据是否正确?
    • 1.你可以做任何你喜欢的事情——单独的应用程序或主应用程序中的东西。 2、在容器上调用stop();您可以使用来自KafkaListenerEndpointRegistry 的 id 获取对它的引用 - 请参阅参考手册。只需在启动之前将起始偏移量传递给侦听器即可。 3. 是的;没有对 kafka 主题/分区的随机访问。 4. 我不知道你这是什么意思。
    • 最后一点 - 我的实际重播策略是让这个新的消费者组读取原始主题,检索消息,然后将其放入新主题中,然后单独使用。我希望这会保持原始偏移量不变,还是我遗漏了什么?
    • 感谢所有耐心的回答,我将尝试实现并让您知道。希望如果我遇到任何麻烦可以找你,我觉得这整件事有点令人生畏:(
    • 我不确定您所说的“保持原始偏移量不变”是什么意思 - 新消费者不会影响主要消费者的偏移量。您不必自己做任何这些;从 2.2 版开始,您可以使用 SeekToCurrentErrorHandlerDeadLetterPublishingRecoverer,框架会自动将一直失败的记录移动到另一个主题。见the reference manual
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多