【发布时间】:2018-03-08 21:31:55
【问题描述】:
我在我的框架中使用 Kafka 生产者 - 消费者模型。在消费者端消费的记录稍后会被索引到 elasticsearch 上。在这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的记录。 我认为@KafkaListener 无法做到这一点。谁能给我一个解决方案?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将不胜感激。
【问题讨论】:
-
如果您在记录被索引到 ES 后在消费者端提交偏移量,那么您不必担心“从我上次离开的地方消费记录”(您的第二个问题)。当 ES 关闭时,您将无法索引,您不会提交偏移量,因此 kakfka 将再次重试您将收到相同的消息。
标签: java spring-boot apache-kafka kafka-consumer-api