【问题标题】:Java Jetty WebSocket Server: Handle broadcasts with asynchronously disconnecting clientsJava Jetty WebSocket 服务器:使用异步断开客户端处理广播
【发布时间】:2014-04-17 15:59:37
【问题描述】:

背景

我正在做一个概念验证,它由一个使用 websocket 通信的服务器-客户端系统组成。我使用了一个小型码头解决方案(jetty-all-9.1.3.v20140225.jar 和 servlet-api-3.1.jar)。我拥有 PoC 所需的大部分基本功能。

我有 2 节课:

  • TestServer(使用 main 函数创建服务器实例,参见代码)
  • ClientSocket(为每个客户端实例化的 WebSocket 对象)

问题

我想讨论的问题与广播客户端断开连接有关。服务器将所有 ClientSockets 实例保存在一个数组中,ClientSockets 在其“onConnect”函数中添加到该数组中。

系统稍后将限制广播到客户端组,但对于 PoC,所有连接的客户端都应获得广播。如果一个客户端断开连接,我想向所有其他客户端发送通知(“myClient has disconnected.”或类似内容)。

为此,我在服务器中实现了一个广播功能,该功能循环遍历客户端列表,将此信息发送给所有连接的客户端,但断开连接的客户端除外。此功能还用于通知所有客户端有关其他事情,例如新连接,并且在客户端断开连接同时有人连接或广播某些内容的情况下,这个问题也很可能发生在这里。

这个问题很容易通过连接几个(10+)个客户端(我在 js 中完成)然后同时断开它们来产生。如果我这样做,我总是会遇到并发错误,对于很少的客户端 (2-3),它有时会起作用,这取决于我猜的时间。

问题

考虑到任何客户端都可以随时(异步)断开连接,我应该如何处理向所有其他客户端广播的任务?我可以这样做而不产生异常吗?由于它是异步的,除了处理发生的异常之外,我看不到任何其他方法。非常感谢任何建议。

代码

TestServer.java

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

public class TestServer {
    private static final TestServer testServer = new TestServer();
    private ArrayList<ClientSocket> clients = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        int port = 8080;
        Server server = new Server(port);
        WebSocketHandler wsHandler = new WebSocketHandler() {
            @Override
            public void configure(WebSocketServletFactory factory) {
                factory.register(ClientSocket.class);
            }
        };
        server.setHandler(wsHandler);
        System.out.println("Starting server on port " + port + ".");
        server.start();
        server.join();
    }

    public static TestServer getServer() {
        return testServer;
    }

    public void addClient(ClientSocket client) {
        this.clients.add(client);
    }

    public void removeClient(ClientSocket client) {
        this.clients.remove(client);
        this.broadcast("disconnect " + client.id, client);  
    }

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        for (ClientSocket cs : this.clients) {
            if (!cs.equals(excludedClient) && cs.session.isOpen() && cs != null) {
                try {
                    cs.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + cs.id + " (" + cs.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

由于以这种方式循环遍历数组,如果你在过程中干预数组,通常是行不通的,所以我也尝试了这个广播功能:

    public void broadcast(String message, ClientSocket excludedClient) {
        log("Sending to all clients: " + message);
        Iterator<ClientSocket> cs = this.clients.iterator();
        while (cs.hasNext()) {
            ClientSocket client = cs.next();
            if (client != null) {
                if (!client.equals(excludedClient) && client.session.isOpen()) {
                try {
                    client.session.getRemote().sendStringByFuture(message);
                } catch (Exception e) {
                    log("Error when broadcasting to " + client.id + " (" + client.address + "):");
                    log(e.getMessage());
                }
            }
        }
    }

但它并没有更好的工作,因为问题是如果另一个 ClientSocket 对象断开连接,因为第一个对象正在广播它的断开连接,那么数组可能会被异步干预。

ClientSocket.java

import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

@WebSocket(maxIdleTime=0)
public class ClientSocket {
    TestServer server = TestServer.getServer();
    Session session;
    String id;
    String address;

    @OnWebSocketClose
    public void onClose(Session session, int statusCode, String reason) {
        server.log("Disconnected: " + this.id + "(" + this.address + ")" + " (statusCode=" + statusCode + ", reason=" + reason + ")");
        server.removeClient(this);
    }

    @OnWebSocketError
    public void onError(Session session, Throwable t) {
        server.log(this.id + "(" + this.address + ") error: " + t.getMessage());
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        this.session = session;
        this.address = this.session.getRemoteAddress().getAddress().toString().substring(1);
        this.id = this.address; //Until user is registered their id is their IP
        server.log("New connection: " + this.address);
        server.addClient(this);
        try {
            session.getRemote().sendString("Hello client with address " + this.address + "!");
        } catch (IOException e) {
            server.log("Error in onConnect for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String message) {
        server.log("Received from " + this.id + "(" + this.address + "): " + message);
        String replyMessage;
        String[] commandList = message.split("\\s+");
        switch (commandList[0].toLowerCase()) {
            case "register":
                if (commandList.length > 1) {
                    this.id = commandList[1];
                    replyMessage = "Registered on server as " + this.id;
                    server.broadcast(this.id + " has connected", this);
                } else {
                    replyMessage = "Incorrect register message";
                }
                break;
            default:
                replyMessage = "echo " + message;
                break;
        }
        server.log("Sending to " + this.id + "(" + this.address + "): " + replyMessage);
        try {
            session.getRemote().sendString(replyMessage);
        } catch (IOException e) {
            server.log("Error during reply in onMessage for " + this.id + "(" + this.address + "): " + e.getMessage());
        }
    }
}

为了完整起见,我粘贴了整个类,尽管我删除了 onMessage 开关中的一些案例。然而,需要注意的部分是 onConnect 和 onClose 函数,它们将从服务器的客户端数组中填充和删除客户端。

错误日志

[2014-04-17 17:40:17.961] Sending to all clients: disconnect testclient4
2014-04-17 17:40:17.962:WARN:ClientSocket:qtp29398564-17: Unhandled Error (closing connection)
org.eclipse.jetty.websocket.api.WebSocketException: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:99)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
Caused by:
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
        at java.util.ArrayList$Itr.next(ArrayList.java:831)
        at TestServer.broadcast(TestServer.java:61)
        at TestServer.removeClient(TestServer.java:45)
        at ClientSocket.onClose(ClientSocket.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
        at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
        at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
        at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
        at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
        at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
        at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
        at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
        at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
        at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
        at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
        at java.lang.Thread.run(Thread.java:744)
[2014-04-17 17:40:17.97] testclient7(94.246.80.30) error: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]

当同时断开所有客户端时,有时会发生这种情况。我很确定这与 ClientSocket 对象在广播的同时消失有关。

【问题讨论】:

    标签: java asynchronous websocket jetty


    【解决方案1】:

    替换:

    private ArrayList<ClientSocket> clients = new ArrayList<>();
    

    与:

    private List<ClientSocket> clients = new CopyOnWriteArrayList<>();
    

    在您的广播方法中,只需使用 for each 语句:

    for( ClientSocket client : clients )
    {
      if ( !client.equals(excludedClient) && client.session.isOpen() )
      {
        // Broadcast...
      }
    }
    

    这将为您提供线程安全的迭代。发生 ConcurrentModificationException 是因为您在迭代 List 的同时修改了 List。 CopyOnWriteArrayList 在写入时复制内部数组,因此修改不会干扰迭代。当然,这会增加一些额外的开销来改变列表,因此您可能需要考虑另一种确保线程安全的方法,但是我怀疑这足以满足您的需求。如果读取比写入更常见(通常是这种情况),那么“写入时复制”数据结构就可以了。

    【讨论】:

    • 我换了公司,不再从事这个项目,但作为更新,我在需要的地方手动复制了数组,这很可能会得到与这个答案相同的结果?除此之外,我还相应地处理了最终异常,因为在这种异步情况下,对象很可能会消失。即使我无法验证我所拥有的异常是否会随着这个答案消失,但它仍然是所提出问题的有效答案。只要确保通过处理最终异常来覆盖对象消失/断开连接。
    • 感谢您回来。我不确定您所说的“手动复制数组”是什么意思——这仍然会带来问题。 CopyOnWriteArrayList 不会改变内部数组 - 它会在每次写入时创建一个新数组,因此任何活动迭代都在不会更改的数组上执行。对象不能“消失”,因为它们被迭代器引用 - 即使在列表中调用了 remove(),您仍然会在迭代中看到它们。 ClientSocket 会话可能会在开始迭代和发送消息之间终止,因此您需要允许这样做。
    • 你是对的。我的意思是,每次我迭代它时,我都会创建一个新列表作为原始列表的副本,以确保我迭代的列表在迭代期间不会被其他线程更改(删除对象)。我所说的对象消失的意思是,已建立的 websocket 会话可能会关闭,因为任何人都可以退出或连接中断。所以正如你所说,这需要以某种方式处理。所以你的评论和回答是 100% 正确的,我只是马虎,对我评论中的措辞没有足够的思考。 :)
    • 谢谢。我的观点是,即使复制列表也可能会出现间歇性问题,因为您仍然必须遍历原始列表/数组来制作副本。您所做的只是通过在迭代期间做更少的工作来减少线程竞争可能发生的时间,但它们仍然可能发生。使用写时复制可以防止这种情况发生。
    • 您好,如果您使用 ArrayList copy = new ArrayList(original) 复制一个数组,如果另一个线程正在从“原始”列表中删除一个元素,这会引发异常吗?如果是这样,在制作副本时添加同步(原始)块会解决并发问题吗?
    猜你喜欢
    • 2018-11-14
    • 1970-01-01
    • 2016-02-24
    • 1970-01-01
    • 1970-01-01
    • 2012-08-07
    • 2017-09-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多