【发布时间】:2020-12-28 16:15:12
【问题描述】:
高级架构
JMS(生产者/消费者) Artemis(STOMP) Websocket-Broker-Relay-Service STOMP-over-Websocket-client(生产者/消费者)
一些观察
-
在 STOMP 消费者中,通过 client-individual 确认订阅,无论我是 NACK 还是 ACK,Artemis 都会丢弃该消息。我希望将消息重新传递给同一消费者或任何其他消费者。有没有办法实现?
-
在 JMS 消费者中,如果在 Artemis 上接收到消息时消费者已关闭,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复,持久消息将被传递。
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization", "Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1, this);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}", e);
}
}
}
}
【问题讨论】:
标签: spring-boot jms stomp activemq-artemis