【问题标题】:How to redeliver a STOMP message to the consumer in case of any processing failure?如果处理失败,如何将 STOMP 消息重新传递给消费者?
【发布时间】:2020-12-28 16:15:12
【问题描述】:

高级架构

JMS(生产者/消费者) Artemis(STOMP) Websocket-Broker-Relay-Service STOMP-over-Websocket-client(生产者/消费者)

一些观察

  1. 在 STOMP 消费者中,通过 client-individual 确认订阅,无论我是 NACK 还是 ACK,Artemis 都会丢弃该消息。我希望将消息重新传递给同一消费者或任何其他消费者。有没有办法实现?

  2. 在 JMS 消费者中,如果在 Artemis 上接收到消息时消费者已关闭,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复,持久消息将被传递。

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization", "Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1, this);

    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}", e);
            }
        }
    }

}

【问题讨论】:

    标签: spring-boot jms stomp activemq-artemis


    【解决方案1】:

    当使用 ActiveMQ Artemis 时,一个 STOMP ACK 帧告诉代理该消息已被成功消费,因此应将其从队列中删除。 STOMP NACK 帧告诉代理该消息没有被成功消费,因此代理将丢弃它。 STOMP 规范未指定此处的确切行为。它只说:

    NACKACK 相反。它用于告诉服务器客户端没有消费该消息。然后,服务器可以将消息发送到不同的客户端,将其丢弃,或将其放入死信队列。确切的行为是特定于服务器的。

    NACK 采用与ACK 相同的标头:id(必需)和transaction(可选)。

    NACK 适用于单个消息(如果订阅的确认模式为client-individual)或之前发送但尚未确认或确认的所有消息(如果订阅的确认模式为client)。

    如果您希望重新传递消息,则不应确认或取消消息,并且当消费者的连接关闭时,消息将被放回队列中以传递给另一个(或相同的)客户端。

    我希望将来这种行为是可配置的。

    【讨论】:

    • 感谢您的回复。这些消息是时间关键的。如果我们不 NACK/ACK,在这种情况下,要重新传递消息,连接 TTL 需要非常低。而且即使connection-TTL很低,只要有新的传入消息,连接就会保持活动状态,在这种情况下,消息将被卡在等待连接关闭的队列中。你有什么建议?还是我错过了什么?
    • 我不清楚 connection-TTL 在这里进入图片的位置,我不明白为什么到达的新消息会强制连接保持活动状态。如果您的应用程序需要关闭连接以触发重新传递,它可以简单地关闭连接,不是吗?应用程序控制自己的连接。至少,在大多数用例中都是这样吗?你的用例有什么不同吗?
    • Apache ActiveMQ Artemis 通过连接 TTL 使所有这些都可配置。基本上,TTL 决定了在没有来自客户端的任何数据的情况下,服务器将保持连接活动的时间。客户端会定期自动发送“ping”数据包,以防止服务器关闭它。如果服务器在连接 TTL 时间内没有收到任何连接上的数据包,那么它将自动关闭服务器上与该连接相关的所有会话从文档中的这一段,我明白了。
    • 虽然,根据您的最后评论,我理解,您的意思是在失败的情况下,我应该故意关闭连接并从消费者重新连接作为解决方法。如果可以按照您在回复中提到的那样配置此行为,那就太好了。
    • 连接 TTL 的存在是为了确保代理关闭连接并清理任何服务器端资源,以防客户端或网络故障或行为不端的客户端根本没有正确关闭其连接。它与这个用例没有任何关系。如前所述,您应该故意关闭连接以触发重新传递。至于使行为可配置,请考虑提供解​​决方案。毕竟,ActiveMQ Artemis 是开源的,代码贡献总是受欢迎的。
    猜你喜欢
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    • 2020-12-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-13
    • 2019-05-08
    相关资源
    最近更新 更多