【问题标题】:How to pause a kafka consumer?如何暂停卡夫卡消费者?
【发布时间】:2018-03-08 21:31:55
【问题描述】:

我在我的框架中使用 Kafka 生产者 - 消费者模型。在消费者端消费的记录稍后会被索引到 elasticsearch 上。在这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者,直到 ES 启动,一旦启动,我需要恢复消费者并使用我上次离开的记录。 我认为@KafkaListener 无法做到这一点。谁能给我一个解决方案?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将不胜感激。

【问题讨论】:

  • 如果您在记录被索引到 ES 后在消费者端提交偏移量,那么您不必担心“从我上次离开的地方消费记录”(您的第二个问题)。当 ES 关闭时,您将无法索引,您不会提交偏移量,因此 kakfka 将再次重试您将收到相同的消息。

标签: java spring-boot apache-kafka kafka-consumer-api


【解决方案1】:

有几种可能的解决方案,一种简单的方法是使用 KafkaConsumer API。在 KafkaConsumer 中,实现跟踪主题的位置,下次调用 poll(...) 时将检索该位置。您的问题是从 Kafka 获取记录后,您可能无法将其插入 Elastic Search。在这种情况下,您必须编写一个例程来重置消费者的位置,在您的情况下将是 consumer.seek(partition, consumer.position(partition)-1)。这会将位置重置为较早的位置。此时,一个好的方法是暂停分区(这将使服务器能够进行一些资源清理),然后轮询 ES(通过您想要的任何机制)。一旦 ES 可用,就调用消费者的 resume 并继续您通常的轮询插入周期。

讨论后编辑

使用指定的生命周期方法创建一个 spring bean。在 bean 的初始化方法中实例化您的 KafkaConsumer(从任何来源检索消费者的配置)。从方法开始一个线程与消费者交互并更新 ES,其余的设计如上。这是一个单线程模型。为了获得更高的吞吐量,请考虑将从 Kafka 检索到的数据保存在内存队列中的小型内存队列中,并使用调度程序线程来获取消息并将其提供给池化线程以更新 ES。

【讨论】:

  • 我了解如何做到这一点的理论,想到了一些与您的答案非常接近的东西,我的问题在于实施。能否请您发布一个相同的代码 sn-p?
  • 你可能不能用注解来做到这一点,如果你使用KafkaConsumer没问题,实现很简单,所有列出的方法都可用。您在实施中遇到的问题
  • 我的 Consumer 是一个使用 @KafkaListener 注解的 POJO 监听器。我有一个用于此消费者配置的 ConsumerConfig 类。我的监听方法如下:@KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") public void process(ConsumerRecord, ?> record) { logger.info("record: "+record);我不知道从哪里获取“消费者”对象以应用搜索和其他方法。
  • 据我了解,您使用 Spring 的开发模型给您带来了问题,因为您没有直接使用 KafkaConsumer 的灵活性。您正在部署哪个容器。
  • 是的,那么我该如何在 SPRING 中实现呢?
【解决方案2】:

我建议宁愿暂停消费者,为什么你不能一次又一次地重试同一条消息,并在消息被成功消费后提交偏移量。

例如:

@Retryable注释你的方法

并使用 try/catch 阻止您的方法,并在 catch 块中抛出新异常。

对于 ListenerFactory 配置添加属性:

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);

【讨论】:

    【解决方案3】:

    有几种方法可以实现这一目标。

    方法#1

    在 Thread 中创建您的 KafkaConsumer 对象并运行无限 while 循环以使用事件。

    完成此设置后,您可以中断线程并在while 循环中检查Thread.interrupt() 是否为true。如果是,则跳出循环并关闭消费者。

    完成恢复活动后,使用相同的组 ID 重新创建使用者。请注意,这可能会重新平衡消费者。

    如果您使用 python,可以使用线程 stop_event 来实现同样的事情。

    方法#2

    使用 KafkaConumer API pause(partitions_list) 函数。它接受 Kafka 分区作为输入。因此,提取分配给消费者的所有部分并将这些部分传递给pause(partitions_list) 函数。消费者将停止从这些分区中提取数据。

    一定时间后,可以使用resume(partitions_list)函数恢复消费者。此方法不会重新平衡消费者。

    注意:如果您使用的是 Spring Kafka 客户端。这变得容易多了。您可以启动/停止消息侦听器容器。

    你可以找到详细的解释here

    【讨论】:

      猜你喜欢
      • 2019-07-03
      • 2018-05-05
      • 2021-08-22
      • 1970-01-01
      • 1970-01-01
      • 2020-10-28
      • 2015-12-18
      • 2019-03-27
      • 2017-01-07
      相关资源
      最近更新 更多