【问题标题】:Spring Boot and WebSockets - got connection lost when sending high throuput through secured websocket using Stomp protocolSpring Boot 和 WebSocket - 使用 Stomp 协议通过安全 websocket 发送高吞吐量时连接丢失
【发布时间】:2019-11-09 06:15:12
【问题描述】:

我有 2 个 WebSockets 端点:

registry.addEndpoint("/ws-handshake").withSockJS();
registry.addEndpoint("/api/admin/ws-handshake").withSockJS();

第一个没有身份验证,第二个使用 Spring Boot Security 保护,使用与 HTTP 安全相同的配置,并使用 OAuth2。授权标头用于连接到安全端点。

我使用了一个简单的for循环,因为这是一个POC但是在生产中会使用相同数量的数据,返回很多记录,这是通过使用Spring Boot提供的SimpMessageSendingOperations类来完成的:

    private SimpMessageSendingOperations messagingTemplate;

    @MessageMapping("/tail-topic")
    public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
        String topicName = "/topic/logentries";
        for (int i = 0; i < 3000; i++) {
            WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
            messageTemplate.convertAndSend(topicName, message);
            // TimeUnit.MILLISECONDS.sleep(50);
        }
        log.info("done returning all messages");
    }

当我通过不安全的端点发送请求时,一切运行正常,我收到客户端的所有内容。

当我通过安全端点发送请求时,一切都运行良好,直到某个有效负载编号(例如 1499,始终是相同的有效负载编号)。 除非我取消注释超时,但需要超过 10 毫秒,否则我会失去连接,但在以后的数字上。

似乎我的安全设置在看到太多流量时断开了连接,我的安全配置中是否缺少某些内容?

【问题讨论】:

  • 您用于 messageTemplate 的对象是什么?看起来它可能是为了处理消息的异步性质,在您发送消息等待回复后,连接在后台保持打开状态。通过添加延迟,您可以让连接有足够的时间在达到某种缓冲区溢出并导致连接关闭之前跟上。
  • @DanielTung 这是 SimpMessageSendingOperations,我将编辑问题以澄清这一点
  • 我真的认为这与超载连接有关。我不相信每个发送操作都会等待操作完成后再进行下一个操作。
  • @DanielTung 谢谢你,我会看看那条路

标签: spring-boot spring-security websocket stomp sockjs


【解决方案1】:

我想我已经找到了解决这个问题的真正方法, 尽管框架是非阻塞的,但它仍然在发送每条消息后等待来自rabbitMQ stomp broker 的响应。 在高吞吐量下发生的情况是,有时响应会丢失并且它会永远挂起等待,在 StompBrokerRelayMessageHandler 中,在 future.get() 行中:

        try {
            ListenableFuture<Void> future = super.forward(message, accessor);
            if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
                future.get();
            }
            return future;
        }
        catch (Throwable ex) {
            throw new MessageDeliveryException(message, ex);
        }

对我有用的解决方案是在不等待响应的情况下发送消息,方法是添加消息头,如下所示:

    SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
    accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
    accessor.setLeaveMutable(true);
    messagingTemplate.convertAndSend(destination, message, accessor.getMessageHeaders());

当然,在这种情况下,一些消息可能会丢失,但对于我的用例来说已经足够了。

另一个可行的解决方案是使用超时和单线程执行程序包装 convertAndSend 行,然后重试

干杯

【讨论】:

  • 如果你能有空错过一些消息,确实是一个可能的解决方案
【解决方案2】:

由于对此似乎没有回应,我将接受临时解决方法作为解决方案:

private SimpMessageSendingOperations messagingTemplate;

@MessageMapping("/tail-topic")
public void tailLogSendToTopic(@Payload WebSocketPoCPayload payload) throws InterruptedException {
    String topicName = "/topic/logentries";
    for (int i = 0; i < 3000; i++) {
        WebSocketPoCMessage message = new WebSocketPoCMessage(String.format("%d : returning payload %s", i, payload.getName()));
        messageTemplate.convertAndSend(topicName, message);
        TimeUnit.MILLISECONDS.sleep(50);
    }
    log.info("done returning all messages");
}

【讨论】:

    猜你喜欢
    • 2020-10-27
    • 1970-01-01
    • 2016-05-06
    • 2020-06-17
    • 1970-01-01
    • 2014-12-27
    • 1970-01-01
    • 2015-09-15
    • 2018-02-21
    相关资源
    最近更新 更多