【问题标题】:How can I detect a disconnected client with Spring 5 Reactive WebSocket如何使用 Spring 5 Reactive WebSocket 检测断开连接的客户端
【发布时间】:2017-12-15 17:17:29
【问题描述】:

我设法使用 Spring 5 Reactive WebSocket 支持 (Chapter 23.2.4) 创建了一个 WebSocketHandler。接收和发送一切正常。但是,我不知道如何检测客户端断开连接。当调试客户端断开连接时,它会在HttpServerWSOperations 类(netty.http.server 包)中的服务器端停止,在那里它确实检测到CloseWebSocketFrame

对于如何处理客户端断开连接有什么建议吗?

【问题讨论】:

    标签: java spring websocket spring-websocket spring-webflux


    【解决方案1】:

    我在响应式 org.springframework.web.reactive.socket.WebSocketHandler 中实现了一个关闭事件处理程序,如下所示:

    public Mono<Void> handle(final WebSocketSession session) {
        final String sessionId = session.getId();
        if(sessions.add(sessionId)) {  // add session id to set to keep a count of active sessions
            LOG.info("Starting WebSocket Session [{}]", sessionId);
            // Send the session id back to the client
            WebSocketMessage msg = session.textMessage(String.format("{\"session\":\"%s\"}", sessionId));
            // Register the outbound flux as the source of outbound messages
            final Flux<WebSocketMessage> outFlux = Flux.concat(Flux.just(msg), newMetricFlux.map(metric -> {
                LOG.info("Sending message to client [{}]: {}", sessionId, metric);
                return session.textMessage(metric);             
            }));
            // Subscribe to the inbound message flux
            session.receive().doFinally(sig -> {
                LOG.info("Terminating WebSocket Session (client side) sig: [{}], [{}]", sig.name(), sessionId);
                session.close();
                sessions.remove(sessionId);  // remove the stored session id
            }).subscribe(inMsg -> {
                LOG.info("Received inbound message from client [{}]: {}", sessionId, inMsg.getPayloadAsText());
            });
            return session.send(outFlux);
        }
        return Mono.empty();
    }
    

    newMetricFlux 字段是出站 websocket 消息的来源。挂钩关闭事件的技巧是入站消息通量上的 doFinally。当 websocket 客户端关闭时,入站通量终止。

    不过,由于某种原因,在 netty 通道关闭和执行 doFinally 回调之间存在 1 分钟的延迟。还不知道为什么。

    这是浏览器客户端连接并立即关闭的日志输出。请注意第 3 行和第 4 行之间的 60 秒延迟。

    2017-08-03 11:15:41.177 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations   : New http connection, requesting read
    2017-08-03 11:15:41.294  INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler      : Starting WebSocket Session [87fbe66]
    2017-08-03 11:15:48.294 DEBUG 28505 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations   : CloseWebSocketFrame detected. Closing Websocket
    2017-08-03 11:16:48.293  INFO 28505 --- [ctor-http-nio-2] c.h.w.ws.NewMetricsWebSocketHandler      : Terminating WebSocket Session (client side) sig: [ON_COMPLETE], [87fbe66]
    

    更新:2017 年 10 月 13 日:

    从 Spring 5 GA 开始,不存在上述延迟,我观察到我的回调在客户端关闭后立即被调用。不确定这是在哪个版本中修复的,但正如我所说,它已在 5.0 GA 中修复。

    【讨论】:

    • 太棒了!那解决了!还没有调查延迟。
    • 只有一个注释...记得在你的应用程序上添加@EnableWebFlux....否则将不起作用...
    【解决方案2】:

    借助afterConnectionClosed 方法,您可以检测到这一点。借助handleTransportError 方法,您甚至可以处理传输错误。这是一个代码sn-p:

    @Component
    public class YourHandler extends TextWebSocketHandler {
    
        private final static Logger logger = LoggerFactory.getLogger(YourHandler .class);
        private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
    
        @Override
        public void afterConnectionEstablished(WebSocketSession session) {
            logger.debug("Connected : " + session);
            sessions.add(session);
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
            try {
                //check the close status if you want...
                if (!status.equals(CloseStatus.NORMAL)) {
                    session.close();
                }
            } catch (IOException e) {
                logger.error("Cannot close session on afterConnectionClosed ", e);
            }
            sessions.remove(session);
        }
    
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) {
            logger.debug("error has occured with the following session {}", session);
            try {
                session.close();
            } catch (IOException e) {
                logger.error("Cannot close session on handleTransportError ", e);
            }
        }
    
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
            logger.debug("Receive : " + message.getPayload());      
        }
    }
    

    【讨论】:

    • 谢谢,但您的解决方案适用于非反应式 websocketsession 和处理程序。 Spring 5 有一个包org.springframework.web.reactive.socket,其中驻留了反应式变体。反应式处理程序接口没有afterConnectionClosed 之类的东西。所以看看你的代码,这似乎是我想要的功能,但无法实现。再看一下我所指的那一章。注意:要启用 spring 响应式 Web,请将 spring-boot-starter-web 替换为 spring-boot-starter-webflux。或者去initializr并选择spring boot 2.0.0M2并添加Reactive Web
    猜你喜欢
    • 1970-01-01
    • 2018-08-28
    • 2022-01-03
    • 2020-01-15
    • 2014-04-14
    • 2010-11-15
    • 2015-11-21
    • 1970-01-01
    • 2020-01-27
    相关资源
    最近更新 更多