【问题标题】:How to ask RabbitMQ to retry when business Exception occurs in Spring Asynchronous MessageListener use caseSpring异步MessageListener用例发生业务异常时如何让RabbitMQ重试
【发布时间】:2016-05-02 10:05:54
【问题描述】:

我正在运行一个 Spring AMQP 消息侦听器。

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所见,在处理过程中可能会出现异常。由于 Catch 块中的特定错误,我想重试。我无法通过 onMessage 中的异常。 如何告诉 RabbitMQ 有异常并重试?

【问题讨论】:

    标签: java spring rabbitmq


    【解决方案1】:

    由于onMessage() 不允许抛出已检查的异常,您可以将异常包装在RuntimeException 中并重新抛出它。

    try {
        testService.process(message);
    } catch (BusinessException e) {
        throw new RuntimeException(e);
    }
    

    但是请注意,这可能会导致邮件无限期地重新发送。以下是它的工作原理:

    RabbitMQ 支持拒绝消息并要求代理重新排队。这显示为here。但是 RabbitMQ 本身并没有重试策略的机制,例如设置最大重试次数、延迟等。

    使用 Spring AMQP 时,“requeue on reject”是默认选项。 Spring 的SimpleMessageListenerContainer 默认情况下会在出现未处理的异常时执行此操作。所以在你的情况下,你只需要重新抛出捕获的异常。但是请注意,如果您无法处理消息并且总是抛出异常,则该异常将无限期地重新传递并导致无限循环。

    您可以通过抛出 AmqpRejectAndDontRequeueException 异常来覆盖每条消息的此行为,在这种情况下,消息将不会被重新排队。

    你也可以通过设置完全关闭SimpleMessageListenerContainer的“requeue on reject”行为

    container.setDefaultRequeueRejected(false) 
    

    当一条消息被拒绝并且没有重新排队时,如果在 RabbitMQ 中设置了一个,它将丢失或转移到 DLQ。

    如果您需要具有最大尝试、延迟等的重试策略,最简单的方法是设置一个弹簧“无状态”RetryOperationsInterceptor,它将在线程内进行所有重试(使用Thread.sleep()),而不会在每次重试时拒绝消息(因此每次重试都无需返回 RabbitMQ)。当重试用尽时,默认情况下将记录一个警告并使用该消息。如果您想发送到 DLQ,您将需要 RepublishMessageRecoverer 或自定义 MessageRecoverer 来拒绝消息而不重新排队(在后一种情况下,您还应该 setup 队列上的 RabbitMQ DLQ)。使用默认消息恢复器的示例:

    container.setAdviceChain(new Advice[] {
            org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                    .stateless()
                    .maxAttempts(5)
                    .backOffOptions(1000, 2, 5000)
                    .build()
    });
    

    这显然有一个缺点,即您将在整个重试期间占用线程。您还可以选择使用“有状态”RetryOperationsInterceptor,它将在每次重试时将消息发送回 RabbitMQ,但延迟仍将在应用程序内使用Thread.sleep() 实现,另外设置有状态拦截器是更复杂一点。

    因此,如果您希望在不占用Thread 的情况下延迟重试,您将需要在 RabbitMQ 队列上使用 TTL 的更复杂的自定义解决方案。如果您不想要指数退避(因此每次重试时延迟不会增加),它会更简单一些。要实现这样的解决方案,您基本上在rabbitMQ 上创建另一个带有参数的队列:"x-message-ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>"。然后在主队列上设置"x-dead-letter-exchange":"<name of the queue with the TTL>"。所以现在当你拒绝并且不重新排队消息时,RabbitMQ 会将它重定向到第二个队列。当 TTL 过期时,它将被重定向到原始队列,从而重新传递给应用程序。所以现在你需要一个重试拦截器,它在每次失败后拒绝发送给 RabbitMQ 的消息,并跟踪重试计数。为了避免需要在应用程序中保持状态(因为如果您的应用程序是集群的,您需要复制状态),您可以从 RabbitMQ 设置的 x-death 标头计算重试计数。查看有关此标头 here 的更多信息。因此,此时实现自定义拦截器比使用这种行为自定义 Spring 有状态拦截器更容易。

    同时检查the section about retries in the Spring AMQP reference

    【讨论】:

    • 感谢 Nazreet 提供非常详细的解释。我还有几个疑问。很明显,我必须抛出运行时异常才能重试消息。还使用限制重试尝试的 RetryInterceptor。 1. 在什么情况下我们会使用有状态拦截器/无状态拦截器? 2. 此外,配置了重试选项后,其他消息是否会无人看管,直到所有重试完成?重试rt应该是一个单独的线程吗? 3. 为重试和自定义重试实现创建一个单独的队列怎么样?
    • 我很高兴它有帮助。回答您的问题: 1)有状态/无状态重试拦截器通常用于 Spring(批处理、集成、amqp 等),因此它们具有各种用例。但是在 AMQP 消息消费者的情况下,我认为有状态并没有像我提到的那样提供很大的好处。不同之处在于,有状态的将在每次重试时将消息发送回 RabbitMQ(但在休眠后,如果配置了退避)。
    • 2) 其他消息会发生什么取决于并发消费者的数量。每个消费者都是一个单独的线程。您可以使用 container.setConcurrentConsumers() 进行设置。 3)自定义重试实现在评论中解释非常复杂。我稍后会找时间解释。
    • 与有状态重试拦截器相关的另一个问题是,如果您有一个集群应用程序(多个节点和消费者在同一个队列上),您需要找到一种跨节点复制状态的方法。否则,在最坏的情况下,您可能会获得双倍的重试次数。
    • 添加了使用 RabbitMQ TTL 的不涉及 Thread.sleep 的退避解决方案的简要说明。如果您需要指数退避,它会更加复杂。希望对您有所帮助。
    猜你喜欢
    • 1970-01-01
    • 2022-06-11
    • 2011-10-05
    • 2018-11-18
    • 1970-01-01
    • 2012-08-16
    • 2019-04-12
    • 1970-01-01
    • 2012-01-27
    相关资源
    最近更新 更多