【问题标题】:Instruct RabbitMQ to resend undelivered messages periodically指示 RabbitMQ 定期重新发送未传递的消息
【发布时间】:2016-04-07 06:41:40
【问题描述】:

背景

我们正在使用langohr 与 RabbitMQ 进行交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是发送basic.nack 并将requeue 设置为true,但这将立即重新发送消息,直到服务响应basic.ack。例如,如果服务尝试将消息持久保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,最好每 20 秒左右获取一次未传递的消息(即,如果数据存储关闭,我们不会执行 basic.ackbasic.nack,我们只是让消息保留在队列中)。我们尝试使用ExecutorService 来实现这一点,其要点是这样实现的:

(let [chan (lch/open conn)]  ; We create a new channel since channels in Langohr are not thread-safe
    (log/info "Triggering \"recover\" for channel" chan)
    (try
      (lb/recover chan)
      (catch Exception e (log/error "Failed to call recover" e))
      (finally (lch/close chan))))

不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。但是,我们还有其他使用spring-rabbitmq(Java 中)实现的服务,他们似乎开箱即用地处理了这个问题。我尝试查看 source code 以了解他们是如何做到的,但我还没有做到。

问题

您如何指示 RabbitMQ 定期(重新)传递队列中的消息(最好使用 Langohr)?

【问题讨论】:

    标签: rabbitmq spring-amqp spring-rabbit langohr


    【解决方案1】:

    我不确定您在使用 Spring AMQP 应用程序做什么,但 RabbitMQ 中没有为此内置任何内容。

    但是,使用 TTL 设置死信非常容易,以便在一段时间后重新排队返回原始队列。有关示例、链接等,请参见 this answer

    编辑

    然而,Spring AMQP确实有一个retry interceptor which can be configured to suspend the consumer thread for some period(s) during retry

    有状态的重试拒绝和重新排队; stateless retry 在内部处理重试,在重试期间不与 broker 交互。

    【讨论】:

    • 谢谢,那我们肯定误会了spring amqp。当我们调查我们的一个生产系统中的行为时,我们认为它具有这种行为,但我们很可能是错误的。非常感谢您澄清这一点。你知道 spring-rabbitmq 是只用 requeue 做 nack 还是做一些更花哨的事情?
    • basicNack 与 requeue true/false 取决于您是要立即重试还是拒绝/发送到 DLX/DLQ。另请参阅我的编辑。
    【解决方案2】:

    请参阅this answer,其中有说明:我们对消息进行 Nack,nack 将消息放入保持队列 N 秒,然后 TTL 离开该队列并进入另一个队列,将其放回原始队列。

    设置需要一些工作,但效果很好!

    【讨论】:

      猜你喜欢
      • 2016-05-09
      • 1970-01-01
      • 1970-01-01
      • 2018-03-21
      • 1970-01-01
      • 2014-11-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多