【问题标题】:How to gracefully stop consuming messages with @RabbitListener如何优雅地停止使用@RabbitListener 消费消息
【发布时间】:2019-09-23 07:12:00
【问题描述】:

有没有办法优雅地停止 ListenerContainer 及其关联的 Consumers

我正在努力实现的目标。

  1. 停止使用消息。
  2. 优雅地停下ListenerContainer
  3. 等待长时间运行的消费者,完成后确认。

我可以使用consumer.stop() 停止ListenerContainers,但是活动的长时间运行的消费者不会成功完成,并且处理的消息将不会被确认,因此将在 ListenerContainer 恢复后再次处理.

输出

Waiting for workers to finish.
Workers not finished.
Closing channel for unresponsive consumer: Consumer@6d229b1c

消息已处理,但未确认。

我或许可以使用setForceCloseChannel(false) 实现正常关闭,但是否可以验证取消的消费者是否已完成? SimpleMessageListenerContainer.doShutDown() 有一个本地范围的列表“cancelledConsumers”。

【问题讨论】:

    标签: java spring rabbitmq spring-amqp spring-rabbit


    【解决方案1】:

    增加关机超时时间。

    Message Listener Container Configuration

    关机超时

    当容器关闭时(例如,如果其封闭的 ApplicationContext 已关闭),它会等待处理中的消息,直至达到此限制。默认为五秒。

    /**
     * The time to wait for workers in milliseconds after the container is stopped. If any
     * workers are active when the shutdown signal comes they will be allowed to finish
     * processing as long as they can finish within this timeout. Defaults
     * to 5 seconds.
     * @param shutdownTimeout the shutdown timeout to set
     */
    public void setShutdownTimeout(long shutdownTimeout) {
    

    【讨论】:

    • 感谢您的回复。即使使用 setShutdownTimeout,也可能不会确认最后消费的消息。我相信最好的选择是使用 setForceCloseChannel(false),但是我无法确定取消的消费者是否已经完成。我可能必须扩展 SimpleRabbitListenerContainerFactory,并重新实现/扩展 SimpleMessageListenerContainer 以获取 cancelledConsumers。
    • >Even with setShutdownTimeout, the last consumed message might not be acked 为什么这么说?使超时值很大; stop() 将阻塞直到最后一个消费者线程完成。您可以添加ApplicationListener<AsyncConsumerStoppedEvent>(或@EventListener方法)以在每个消费者线程退出时获取事件)。见Consumer Events。如果需要,您还可以将 force 设置为接近 false 作为额外保护。
    • 无论我们设置多高的setShutdownTimeout,消费者都会继续处理新到达的消息,直到超时。因此,如果消费者在达到超时之前开始处理消息,则不会确认消息。我可以在github上整理一个例子。
    • 不,不会; doShutdown() 取消消费者,因此不再有消息到达。此外,消费者在循环中运行while(isActive()...shutDown() 在调用doShutDown() 之前将active 设置为false。
    • 你是对的,我的错。只有当setShutdownTimeout 小于初始numberOfMessages * consumerExecutionTimePerMessage 时,才会出现“问题”。我很好。谢谢。
    【解决方案2】:

    要完成@user634545 的最后一条评论,我们必须处理2 个属性:

    必须调整这些参数才能验证以下内容:

    prefetchCount < shutdownTimeout * consumerExecutionTimePerMessage
    

    这意味着在收到关闭命令后,消费者应该能够消费并知道每条预取的消息。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-23
      • 1970-01-01
      • 2011-08-12
      • 1970-01-01
      • 1970-01-01
      • 2021-03-19
      • 2017-08-18
      • 1970-01-01
      相关资源
      最近更新 更多