【问题标题】:How can I use ActiveMQ and Spring Integration JMS to receive messages one time in multiple groups?如何使用 ActiveMQ 和 Spring Integration JMS 在多个组中一次接收消息?
【发布时间】:2017-06-29 08:23:50
【问题描述】:

当我们系统中的用户被创建时,我正在尝试使用消息传递服务向其他服务发送消息。接收消息的服务是负载平衡的 Spring Boot 应用程序(在我们的例子中,是 Kubernetes 环境中的 pod)。

我想要:

  1. 每个服务中的每个 pod 都配置为接收消息。

  2. 每个服务中的一个(并且只有一个)pod 实际接收每条消息。

  3. 这样消息可以被多个服务接收。

所以在图中,Notification Service 中的一个 pod 和 Logging Service 中的一个 pod 接收到“New User”消息。

我已经设置了 ActiveMQ 和 Spring Integration 来发送/接收消息,并将其作为 a) 队列(一个接收者)和 b) 主题(订阅者接收它)。问题是:

a) 对于队列,一个接收者意味着 Notifications 会收到它,但 Logging 不会(反之亦然)。

b) 对于一个主题,订阅者意味着所有六个 pod 都会收到。

我觉得我想要的是一个分组,比如“一个 NOTIFICATION 类型的接收者和一个 LOGGING 类型的接收者”,但我不确定如何实现它。消息路由模式似乎可以做到这一点,但我想知道是否可以完全使用 Spring 集成来实现。

一些代码。发件人的配置:

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessagingConfig {      
    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://myactivemq:61616");
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setDefaultDestinationName("user-dest");
        return template;
    }     
}

发送消息的服务(简体):

@Service
public class MessageSender {  
  private final JmsTemplate jmsTemplate;

  @Autowired
  public MessageSender(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }

  public void sendMessage(String userId) {    
    jmsTemplate.send(new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException{
        return session.createTextMessage("NEW USER:" + userId);
      }
    });
  }
}

接收机配置:

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class ReceiverConfig {

  private ActiveMQConnectionFactory getConnectionFactory() {
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL("tcp://myactivemq:61616");
      return connectionFactory;
  }

  @Bean
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      factory.setConnectionFactory(getConnectionFactory());
      factory.setConcurrency("1-1");
      return factory;
  }
}

以及接收消息的服务。

import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;

@Service
public class MessageReceiver {

  @JmsListener(destination = "user-dest")
  public void receiveMessage(final TextMessage message) throws JMSException {
    // Do something with message.getText()
  }
}

这是有效的,但就像一个队列,只有一个收件人。知道一个 Notification Service pod 和一个 Logging Service pod 是如何接收这些信息的吗?

【问题讨论】:

标签: jms spring-integration activemq messaging spring-jms


【解决方案1】:

仅供参考。我最终使用了 RabbitMQ,可能是因为我发现文档更容易。使用 RabbitMQ,解决方案非常简单:

  • 将消息发送到交易所
  • 创建队列并将队列绑定到交换机
    • 我使用了管理 UI,但绑定也可以在代码中。
    • 在上面的示例中,我创建了两个队列,一个用于通知,一个用于日志记录。
  • 通知和日志服务监听各自的队列。

因此,用户服务向其 Exchange 发送一条消息,两个队列都接收它,并且每个侦听服务只有一个 pod 收到该消息。正是我想要的。

【讨论】:

    猜你喜欢
    • 2016-10-13
    • 1970-01-01
    • 2014-10-23
    • 2012-03-18
    • 1970-01-01
    • 2016-03-27
    • 2011-12-12
    • 2014-01-29
    • 2020-11-16
    相关资源
    最近更新 更多