【问题标题】:Spring JMS load balancing with one Producer and one ConcumerSpring JMS 负载均衡与一个生产者和一个消费者
【发布时间】:2018-06-13 22:09:10
【问题描述】:

我对 JMS 负载平衡研究了很长时间。我们可以创建多个生产者和多个消费者来对 JMS 消息进行负载平衡。但我想了解,我们如何通过一个生产者和一个消费者来负载平衡 JMS 消息。我无法在我的项目中添加更多依赖项,例如 Apache Camel。

@Configuration
@EnableJms
@ComponentScan({"com.jmsloadbalance.jms"})
@Bean
public class JmsConfig {
public JmsTemplate getJmsTemplate() {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setDefaultDestination(new ActiveMQQueue("default.topic");
    template.setExplicitQosEnabled(true);
    template.setDeliveryPersistent(false);
    template.setTimeToLive(60000);
    template.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
    template.setMessageConverter(getMessageConverter());
    return template;
}

@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setPubSubDomain(false);
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setConcurrency("1");
    factory.setMessageConverter(getMessageConverter());
    return factory;
}

private ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("vm://localhost");
    return factory;
}

private MessageConverter getMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTypeIdPropertyName("JMSType");
    return converter;
}
}

这是我的 JmsConfig 类,我无法在其中进行大的配置更改,例如引入更多 JMSTemplate 或更多 ConnectionFactory。我的制作人如下所示

@Service("accountJmsProducer")
public class AccountJmsProducer {

private static Logger LOG = Logger.getLogger(AccountJmsProducer.class);

@Autowired
private JmsTemplate template;

private Destination destination;

public Account create(Account account) {
    if (this.destination == null) {
        this.destination = new ActiveMQQueue("account.create");
    }
    template.convertAndSend(destination, account);
    return null;
}
}

我的消费者如下所示:

@Service("accountJmsConsumer")
public class AccountJmsConsumer {

private static final Logger LOG = Logger.getLogger(AccountJmsConsumer.class);

@Autowired
@Qualifier("accountService")
private AccountService accountService;

private Account lastReceived;

@JmsListener(containerFactory = "defaultJmsListenerContainerFactory", destination = "account.create")
public Account create(Account account) {
    LOG.warn("Received " + account);
    setLastReceived(account);
    return accountService.create(account);
}
public synchronized Account getLastReceived() {
    return lastReceived;
}
public synchronized void setLastReceived(Account lastReceived) {
    this.lastReceived = lastReceived;
}
}

【问题讨论】:

  • 我一定错过了什么。你的问题没有意义;当只有一个消费者时,如何在消费者之间“负载平衡”?根据定义,他会收到所有消息。
  • @GaryRussell,我承认我的问题不清楚。我只想说我不能更改 JmsConfig 类。但如果需要,我愿意接受其他更改。如果我创建多个消费者是否有可能,但每个消费者应该只收到唯一的消息。一条消息不应由多个消费者处理。如果这是可能的,那么就有可能在具有上述配置的一个生产者的多个消费者之间进行负载平衡。我想知道这是否可行,那我该怎么做呢?

标签: spring activemq load-balancing spring-jms


【解决方案1】:

当有一个消费者时,不清楚你所说的负载平衡是什么意思,但根据你对我对你问题的评论的评论:

只要目标是一个队列(不是主题)并且这是隐含的,因为您有factory.setPubSubDomain(false),那么它就可以工作。它是 JMS 合同的一部分。如果同一个队列中有多个消费者,消息将在这些消费者之间分发;只有一个消费者会收到一条特定的消息。

如果交付失败,它可能会或可能不会重新交付给同一消费者。

大多数代理(包括 ActiveMQ)都提供某种预取机制。 IIRC,使用 ActiveMQ 默认为 1000。如果您的消息少于此数量,则可能有一个消费者处于空闲状态;如果是这样,请减少预取以调整分布。

【讨论】:

  • 如果我创建了多个消费者,那么它将在消费者中自动进行负载平衡。我想我应该为我的消费者创建另一个新类,比如@Service("accountJmsConsumerSecond") public class AccountJmsConsumerSecond。或者我必须写另一种方法,如@JmsListener(containerFactory = "defaultJmsListenerContainerFactory", destination = "account.create") public Account createSecond(Account account)
  • 为什么?只需设置factory.setConcurrency("2");,您将获得 2 个消费者。
猜你喜欢
  • 2017-07-13
  • 2018-03-07
  • 1970-01-01
  • 1970-01-01
  • 2019-04-30
  • 2011-06-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多