【问题标题】:Change Active MQ RedeliveryPolicy for the embedded ActiveMQ in Sprint Boot在 Spring Boot 中更改嵌入式 ActiveMQ 的 Activemq RedeliveryPolicy
【发布时间】:2018-03-17 08:01:03
【问题描述】:

在使用 Spring Boot 时如何更改嵌入式 ActiveMQ 的重新交付策略?我尝试在 DefaultJmsListenerContainerFactory 上指定 FixedBackOff 但它没有帮助。下面是我用来初始化 jms 工厂 bean 的代码。我有一个消息消费者处理队列中的传入消息。由于资源不可用,在处理过程中,我抛出了一个检查异常。我希望在固定时间间隔后重新发送消息以进行处理。

Spring Boot:1.5.7.Release

Java:1.7

@Bean
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = 
        new DefaultJmsListenerContainerFactory();

    factory.setBackOff(new FixedBackOff(5000, 5));

    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);

    // You could still override some of Boot's default if necessary.
    factory.setErrorHandler(new ErrorHandler() {

        @Override
        public void handleError(Throwable t) {
            LOG.error("Error occured in JMS transaction.", t);
        }

    }); 
    return factory;
}

消费者代码:

@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory")
@Transactional
public void receiveMessage(PublishData publishData) {
    LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId());

    PublishUser user = new PublishUser();
    user.setPriority(1);
    user.setUserId(publishData.getUserId());

    LOG.trace("Trying to enroll in the publish lock queue for user: " + user);
    PublishLockQueue lockQueue = publishLockQueueService.createLock(user);
    if (lockQueue == null)
        throw new RuntimeException("Unable to create lock for publish");
    LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId());

    try {
        publishLockQueueService.acquireLock(lockQueue.getId());
        LOG.trace("Acquired publish lock.");
    }
    catch (PublishLockQueueServiceException pex) {
        throw new RuntimeException(pex);
    }

    try {
        publishService.publish(publishData, lockQueue.getId());
        LOG.trace("Completed publish of changes.");

        sendPublishSuccessNotification(publishData);
    }
    finally {
        LOG.trace("Trying to release lock to publish.");
        publishLockQueueService.releaseLock(lockQueue.getId());
    }

    LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId());
}

【问题讨论】:

  • 它是消费者,您需要使用事务确认模式让消费者回滚异常,并让 ActiveMQ 能够将消息重新传递给同一个消费者,或者如果您有多个消费者正在运行,则可以将消息重新传递给另一个消费者.但是,您可以在 ActiveMQ 上配置重新传递选项,例如退避等。上面的错误处理程序只是一个 noop 侦听器,除了记录之外不能做很多事情。
  • 你能分享你的代码吗?
  • @Makoton 我也包含了消费者代码。
  • @ClausIbsen 我遇到的问题与消息的重新传递无关,而是与时间间隔有关。一旦发生异常,它当前会立即重新传递消息。我想要的是在一定的时间间隔后重新发送消息

标签: spring-boot activemq spring-jms


【解决方案1】:

@claus 回答:我测试过它可以工作:

它是消费者,您需要使用事务确认模式让消费者回滚异常,并让 ActiveMQ 能够将消息重新传递给同一个消费者,或者如果您有多个消费者正在运行,则可以将消息重新传递给另一个消费者。但是,您可以在 ActiveMQ 上配置重新传递选项,例如退避等。上面的错误处理程序只是一个 noop 侦听器,除了记录之外不能做很多事情

【讨论】:

  • 正如我在 cmets 中提到的,我的问题不在于重新交付,而在于重新交付的间隔。我知道我可以直接使用 ActiveMQ 连接工厂对象来指定退避。这意味着我需要包含特定于 ActiveMQ 的代码。我希望不要这样做,只需使用 Spring DefaultJmsListenerContainerFactory#setBackOff 方法指定退避策略
猜你喜欢
  • 2021-08-09
  • 2018-01-04
  • 2017-09-23
  • 2019-03-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-10-14
  • 2018-07-23
相关资源
最近更新 更多