【发布时间】:2019-10-07 06:42:44
【问题描述】:
我有一个简单的 Spring Boot 服务,它使用 JMSTemplate 监听 AWS SQS 队列。正确处理消息后,一切都会按预期工作。
我正在使用 CLIENT_ACKNOWLEDGE,因此当处理过程中抛出异常时,会再次收到消息。但是,SQS 队列上的默认可见性超时设置将被忽略,并且会立即再次接收消息。
在将消息放入 DLQ 之前,SQS 队列配置了 30 秒的默认可见性超时和 20 次接收的重新驱动策略。
我已禁用该服务并使用 SQS 控制台验证默认可见性超时设置是否正确。我还尝试将 JMS 消息添加到方法签名并执行手动验证。
这是 JMS 配置的代码:
@Configuration
@EnableJms
class JmsConfig
{
@Bean
@Conditional(AWSEnvironmentCondition.class)
public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
{
return new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion(Regions.fromName(awsRegion))
.withCredentials(new DefaultAWSCredentialsProviderChain())
);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setErrorHandler(defaultErrorHandler());
return factory;
}
@Bean
public ErrorHandler defaultErrorHandler()
{
return new ErrorHandler()
{
@Override
public void handleError(Throwable throwable)
{
LOG.error("JMS message listener error: {}", throwable.getMessage());
}
};
}
@Bean
public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
{
return new JmsTemplate(connectionFactory);
}
}
这里是监听器的代码:
@Component
public class MessagingListener
{
@Autowired
private MessageService _messageService;
@Autowired
private Validator _validator;
@JmsListener(destination = "myqueue")
public void receiveMessage(String messageJson)
{
try
{
LOG.info("Received message");
// The following line throws an IOException is the message is not JSON.
MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);
Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
if (CollectionUtils.isNotEmpty(violations))
{
String errorMessage = violations.stream()
.map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
v.getMessage()))
LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
throw new ValidationException(errorMessage);
}
}
catch (IOException e)
{
LOG.error("Error parsing message", e);
throw new ValidationException("Error parsing message, details: " + e.getMessage());
}
}
}
当带有无效 JSON 或未通过验证的 JSON 将消息放入 SQS 队列时,消息会很快收到 20 次,然后在 DLQ 上结束。需要做什么才能尊重 SQS 中的默认可见性超时设置?
【问题讨论】:
-
您找到解决方案了吗?
-
也面临这个问题,有什么见解吗?
标签: java amazon-sqs spring-jms jmstemplate