【发布时间】:2017-06-29 08:23:50
【问题描述】:
当我们系统中的用户被创建时,我正在尝试使用消息传递服务向其他服务发送消息。接收消息的服务是负载平衡的 Spring Boot 应用程序(在我们的例子中,是 Kubernetes 环境中的 pod)。
我想要:
每个服务中的每个 pod 都配置为接收消息。
每个服务中的一个(并且只有一个)pod 实际接收每条消息。
这样消息可以被多个服务接收。
所以在图中,Notification Service 中的一个 pod 和 Logging Service 中的一个 pod 接收到“New User”消息。
我已经设置了 ActiveMQ 和 Spring Integration 来发送/接收消息,并将其作为 a) 队列(一个接收者)和 b) 主题(订阅者接收它)。问题是:
a) 对于队列,一个接收者意味着 Notifications 会收到它,但 Logging 不会(反之亦然)。
b) 对于一个主题,订阅者意味着所有六个 pod 都会收到。
我觉得我想要的是一个分组,比如“一个 NOTIFICATION 类型的接收者和一个 LOGGING 类型的接收者”,但我不确定如何实现它。消息路由模式似乎可以做到这一点,但我想知道是否可以完全使用 Spring 集成来实现。
一些代码。发件人的配置:
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
@Configuration
public class MessagingConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://myactivemq:61616");
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestinationName("user-dest");
return template;
}
}
发送消息的服务(简体):
@Service
public class MessageSender {
private final JmsTemplate jmsTemplate;
@Autowired
public MessageSender(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(String userId) {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException{
return session.createTextMessage("NEW USER:" + userId);
}
});
}
}
接收机配置:
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
@Configuration
@EnableJms
public class ReceiverConfig {
private ActiveMQConnectionFactory getConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://myactivemq:61616");
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
}
以及接收消息的服务。
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
@Service
public class MessageReceiver {
@JmsListener(destination = "user-dest")
public void receiveMessage(final TextMessage message) throws JMSException {
// Do something with message.getText()
}
}
这是有效的,但就像一个队列,只有一个收件人。知道一个 Notification Service pod 和一个 Logging Service pod 是如何接收这些信息的吗?
【问题讨论】:
标签: jms spring-integration activemq messaging spring-jms