【发布时间】:2019-04-26 12:03:05
【问题描述】:
目前我使用 AcknoledgeingMessageListener 来实现一个使用 spring-Kafka 的 Kafka 消费者。这个实现帮助我收听特定主题并使用手动确认处理消息。 我现在需要构建以下功能: 让我们假设,对于某个环境异常或通过该主题输入的一些不良数据,我需要从特定偏移量重放某个主题的数据。这将是手动触发(主要是通过执行 Java 类)。
如果我可以检索这些偏移量之间的消息并将其作为重播主题提供,这样新的消费者可以处理这些消息,从而保持原始主题上的偏移量不变,那将是理想的。
- CosumerSeekAware 接口 - 如果这是答案,我如何从外部触发它?通过让我们说一个 mvn -Dexec。我不确定这是否可能
- 另外假设我有一个崩溃时间戳,是否可以自省主题以找到与崩溃对应的偏移量,以便我可以从该偏移量重播?
- 我能否找到与某些特定数据相对应的偏移量,以便我可以重放这些特定的偏移量?
所有这些要求都是围绕我们的 Kafka 功能构建弹性层。我需要所有这些都由一个单独的可执行类管理,该类可以手动触发,提供相关数据(如时间戳等)。此类应确定偏移量,然后寻找该偏移量,检索与这些偏移量对应的消息并将它们发布到单独的主题。有人可以指出我正确的方向吗?恐怕我要绕圈子了。
【问题讨论】:
标签: spring-kafka