【发布时间】:2018-03-17 08:01:03
【问题描述】:
在使用 Spring Boot 时如何更改嵌入式 ActiveMQ 的重新交付策略?我尝试在 DefaultJmsListenerContainerFactory 上指定 FixedBackOff 但它没有帮助。下面是我用来初始化 jms 工厂 bean 的代码。我有一个消息消费者处理队列中的传入消息。由于资源不可用,在处理过程中,我抛出了一个检查异常。我希望在固定时间间隔后重新发送消息以进行处理。
Spring Boot:1.5.7.Release
Java:1.7
@Bean
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setBackOff(new FixedBackOff(5000, 5));
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
LOG.error("Error occured in JMS transaction.", t);
}
});
return factory;
}
消费者代码:
@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory")
@Transactional
public void receiveMessage(PublishData publishData) {
LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId());
PublishUser user = new PublishUser();
user.setPriority(1);
user.setUserId(publishData.getUserId());
LOG.trace("Trying to enroll in the publish lock queue for user: " + user);
PublishLockQueue lockQueue = publishLockQueueService.createLock(user);
if (lockQueue == null)
throw new RuntimeException("Unable to create lock for publish");
LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId());
try {
publishLockQueueService.acquireLock(lockQueue.getId());
LOG.trace("Acquired publish lock.");
}
catch (PublishLockQueueServiceException pex) {
throw new RuntimeException(pex);
}
try {
publishService.publish(publishData, lockQueue.getId());
LOG.trace("Completed publish of changes.");
sendPublishSuccessNotification(publishData);
}
finally {
LOG.trace("Trying to release lock to publish.");
publishLockQueueService.releaseLock(lockQueue.getId());
}
LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId());
}
【问题讨论】:
-
它是消费者,您需要使用事务确认模式让消费者回滚异常,并让 ActiveMQ 能够将消息重新传递给同一个消费者,或者如果您有多个消费者正在运行,则可以将消息重新传递给另一个消费者.但是,您可以在 ActiveMQ 上配置重新传递选项,例如退避等。上面的错误处理程序只是一个 noop 侦听器,除了记录之外不能做很多事情。
-
你能分享你的代码吗?
-
@Makoton 我也包含了消费者代码。
-
@ClausIbsen 我遇到的问题与消息的重新传递无关,而是与时间间隔有关。一旦发生异常,它当前会立即重新传递消息。我想要的是在一定的时间间隔后重新发送消息
标签: spring-boot activemq spring-jms