【问题标题】:SQS Message visibility timeout being set to 0 when exception is thrown and @JMSListener当抛出异常和@JMSListener 时,SQS 消息可见性超时设置为 0
【发布时间】:2019-10-07 06:42:44
【问题描述】:

我有一个简单的 Spring Boot 服务,它使用 JMSTemplate 监听 AWS SQS 队列。正确处理消息后,一切都会按预期工作。

我正在使用 CLIENT_ACKNOWLEDGE,因此当处理过程中抛出异常时,会再次收到消息。但是,SQS 队列上的默认可见性超时设置将被忽略,并且会立即再次接收消息。

在将消息放入 DLQ 之前,SQS 队列配置了 30 秒的默认可见性超时和 20 次接收的重新驱动策略。

我已禁用该服务并使用 SQS 控制台验证默认可见性超时设置是否正确。我还尝试将 JMS 消息添加到方法签名并执行手动验证。

这是 JMS 配置的代码:

@Configuration
@EnableJms
class JmsConfig
{

    @Bean
    @Conditional(AWSEnvironmentCondition.class)
    public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
    {
        return new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .withRegion(Regions.fromName(awsRegion))
                                  .withCredentials(new DefaultAWSCredentialsProviderChain())
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
    {
        DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("3-10");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setErrorHandler(defaultErrorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler defaultErrorHandler()
    {
        return new ErrorHandler()
        {
            @Override
            public void handleError(Throwable throwable)
            {
                LOG.error("JMS message listener error: {}", throwable.getMessage());
            }
        };
    }

    @Bean
    public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
    {
        return new JmsTemplate(connectionFactory);
    }
}

这里是监听器的代码:

@Component
public class MessagingListener
{
    @Autowired
    private MessageService _messageService;

    @Autowired
    private Validator _validator;

    @JmsListener(destination = "myqueue")
    public void receiveMessage(String messageJson)
    {
        try
        {
            LOG.info("Received message");

            // The following line throws an IOException is the message is not JSON.
            MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);

            Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
            if (CollectionUtils.isNotEmpty(violations))
            {
                String errorMessage = violations.stream()
                        .map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
                                v.getMessage()))
                LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
                throw new ValidationException(errorMessage);
            }
        }
        catch (IOException e)
        {
            LOG.error("Error parsing message", e);
            throw new ValidationException("Error parsing message, details: " + e.getMessage());
        }
    }
}

当带有无效 JSON 或未通过验证的 JSON 将消息放入 SQS 队列时,消息会很快收到 20 次,然后在 DLQ 上结束。需要做什么才能尊重 SQS 中的默认可见性超时设置?

【问题讨论】:

  • 您找到解决方案了吗?
  • 也面临这个问题,有什么见解吗?

标签: java amazon-sqs spring-jms jmstemplate


【解决方案1】:

如果出现异常,通过ChangeMessageVisibility 将失败消息的可见性超时设置为 0,因此即使队列具有不同的visibilityTimeout 设置,SQS 也会立即发送此消息。

这是怎么发生的?

如您所见 here,Spring JMS 的 AbstractMessageListenerContainer 简要地这样做了:

try {
    invokeListener(session, message); // This is your @JMSListener method
}
catch (JMSException | RuntimeException | Error ex) {
    rollbackOnExceptionIfNecessary(session, ex);
    throw ex;
}
commitIfNecessary(session, message);

rollbackOnExceptionIfNecessary 方法上,session.recover() 将被调用,因为:

  1. session.getTransacted() 将始终为 false,因为 SQS 不支持事务。见here
  2. isClientAcknowledge(session) 将返回 true,因为您使用的是 CLIENT_ACKNOWLEDGE 模式。

最后recover() of SQSSession 否定确认该消息,这意味着将该特定消息的visibilityTimeout 设置为0,导致SQS 立即尝试发送该消息。

覆盖此行为的最简单方法是实现CustomJmsListenerContainerFactoryCustomMessageListenerContainer,而不是使用DefaultJmsListenerContainerFactoryDefaultMessageListenerContainer

public class CustomMessageListenerContainer extends DefaultMessageListenerContainer {

    public CustomMessageListenerContainer() {
        super();
    }

    @Override
    protected void rollbackOnExceptionIfNecessary() {
        // do nothing, so that "visibilityTimeout" will stay same
    }

}

public class CustomJmsListenerContainerFactory {
    
    @Override
    protected DefaultMessageListenerContainer createContainerInstance() {
        return new CustomMesageListenerContainer();
    }
}

并使用@Component 或就像您在JmsConfig 中所做的那样使其成为Spring bean:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
    DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    // and set other stuff on factory
    return factory;
}

注意
如果您的应用程序使用 JMS 和 SQS 一起使用其他类型的数据源,请确保为它们使用不同的 Container 和 ContainerFactory,以便 rollbackOnExceptionIfNecessary 的行为符合预期。

【讨论】:

  • 据我测试,这很有效。在更新等情况下,我通常不喜欢这种压倒一切的行为,但我真的找不到任何其他方式,甚至没有提到这一点。在“rollbackOnExceptionIfNecessary”上执行此覆盖会破坏什么?
  • 我在答案中提到了这一点。您可以查看答案中的session.recover() 链接。它基本上只是否定确认消息,这意味着将可见性超时设置为 0。
猜你喜欢
  • 1970-01-01
  • 2017-10-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-02
  • 2017-05-09
相关资源
最近更新 更多