【发布时间】:2016-04-07 06:41:40
【问题描述】:
背景
我们正在使用langohr 与 RabbitMQ 进行交互。我们尝试了两种不同的方法来让 RabbitMQ 重新发送我们的服务尚未正确处理的消息。一种可行的方法是发送basic.nack 并将requeue 设置为true,但这将立即重新发送消息,直到服务响应basic.ack。例如,如果服务尝试将消息持久保存到当前已关闭(并且已关闭一段时间)的数据存储区,这会有点问题。对我们来说,最好每 20 秒左右获取一次未传递的消息(即,如果数据存储关闭,我们不会执行 basic.ack 或 basic.nack,我们只是让消息保留在队列中)。我们尝试使用ExecutorService 来实现这一点,其要点是这样实现的:
(let [chan (lch/open conn)] ; We create a new channel since channels in Langohr are not thread-safe
(log/info "Triggering \"recover\" for channel" chan)
(try
(lb/recover chan)
(catch Exception e (log/error "Failed to call recover" e))
(finally (lch/close chan))))
不幸的是,这似乎不起作用(消息没有重新传递,只是保留在队列中)。如果我们重新启动服务,排队的消息将被正确使用。但是,我们还有其他使用spring-rabbitmq(Java 中)实现的服务,他们似乎开箱即用地处理了这个问题。我尝试查看 source code 以了解他们是如何做到的,但我还没有做到。
问题
您如何指示 RabbitMQ 定期(重新)传递队列中的消息(最好使用 Langohr)?
【问题讨论】:
标签: rabbitmq spring-amqp spring-rabbit langohr