【发布时间】:2014-04-17 15:59:37
【问题描述】:
背景
我正在做一个概念验证,它由一个使用 websocket 通信的服务器-客户端系统组成。我使用了一个小型码头解决方案(jetty-all-9.1.3.v20140225.jar 和 servlet-api-3.1.jar)。我拥有 PoC 所需的大部分基本功能。
我有 2 节课:
- TestServer(使用 main 函数创建服务器实例,参见代码)
- ClientSocket(为每个客户端实例化的 WebSocket 对象)
问题
我想讨论的问题与广播客户端断开连接有关。服务器将所有 ClientSockets 实例保存在一个数组中,ClientSockets 在其“onConnect”函数中添加到该数组中。
系统稍后将限制广播到客户端组,但对于 PoC,所有连接的客户端都应获得广播。如果一个客户端断开连接,我想向所有其他客户端发送通知(“myClient has disconnected.”或类似内容)。
为此,我在服务器中实现了一个广播功能,该功能循环遍历客户端列表,将此信息发送给所有连接的客户端,但断开连接的客户端除外。此功能还用于通知所有客户端有关其他事情,例如新连接,并且在客户端断开连接同时有人连接或广播某些内容的情况下,这个问题也很可能发生在这里。
这个问题很容易通过连接几个(10+)个客户端(我在 js 中完成)然后同时断开它们来产生。如果我这样做,我总是会遇到并发错误,对于很少的客户端 (2-3),它有时会起作用,这取决于我猜的时间。
问题
考虑到任何客户端都可以随时(异步)断开连接,我应该如何处理向所有其他客户端广播的任务?我可以这样做而不产生异常吗?由于它是异步的,除了处理发生的异常之外,我看不到任何其他方法。非常感谢任何建议。
代码
TestServer.java
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
public class TestServer {
private static final TestServer testServer = new TestServer();
private ArrayList<ClientSocket> clients = new ArrayList<>();
public static void main(String[] args) throws Exception {
int port = 8080;
Server server = new Server(port);
WebSocketHandler wsHandler = new WebSocketHandler() {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(ClientSocket.class);
}
};
server.setHandler(wsHandler);
System.out.println("Starting server on port " + port + ".");
server.start();
server.join();
}
public static TestServer getServer() {
return testServer;
}
public void addClient(ClientSocket client) {
this.clients.add(client);
}
public void removeClient(ClientSocket client) {
this.clients.remove(client);
this.broadcast("disconnect " + client.id, client);
}
public void broadcast(String message, ClientSocket excludedClient) {
log("Sending to all clients: " + message);
for (ClientSocket cs : this.clients) {
if (!cs.equals(excludedClient) && cs.session.isOpen() && cs != null) {
try {
cs.session.getRemote().sendStringByFuture(message);
} catch (Exception e) {
log("Error when broadcasting to " + cs.id + " (" + cs.address + "):");
log(e.getMessage());
}
}
}
}
由于以这种方式循环遍历数组,如果你在过程中干预数组,通常是行不通的,所以我也尝试了这个广播功能:
public void broadcast(String message, ClientSocket excludedClient) {
log("Sending to all clients: " + message);
Iterator<ClientSocket> cs = this.clients.iterator();
while (cs.hasNext()) {
ClientSocket client = cs.next();
if (client != null) {
if (!client.equals(excludedClient) && client.session.isOpen()) {
try {
client.session.getRemote().sendStringByFuture(message);
} catch (Exception e) {
log("Error when broadcasting to " + client.id + " (" + client.address + "):");
log(e.getMessage());
}
}
}
}
但它并没有更好的工作,因为问题是如果另一个 ClientSocket 对象断开连接,因为第一个对象正在广播它的断开连接,那么数组可能会被异步干预。
ClientSocket.java
import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket(maxIdleTime=0)
public class ClientSocket {
TestServer server = TestServer.getServer();
Session session;
String id;
String address;
@OnWebSocketClose
public void onClose(Session session, int statusCode, String reason) {
server.log("Disconnected: " + this.id + "(" + this.address + ")" + " (statusCode=" + statusCode + ", reason=" + reason + ")");
server.removeClient(this);
}
@OnWebSocketError
public void onError(Session session, Throwable t) {
server.log(this.id + "(" + this.address + ") error: " + t.getMessage());
}
@OnWebSocketConnect
public void onConnect(Session session) {
this.session = session;
this.address = this.session.getRemoteAddress().getAddress().toString().substring(1);
this.id = this.address; //Until user is registered their id is their IP
server.log("New connection: " + this.address);
server.addClient(this);
try {
session.getRemote().sendString("Hello client with address " + this.address + "!");
} catch (IOException e) {
server.log("Error in onConnect for " + this.id + "(" + this.address + "): " + e.getMessage());
}
}
@OnWebSocketMessage
public void onMessage(Session session, String message) {
server.log("Received from " + this.id + "(" + this.address + "): " + message);
String replyMessage;
String[] commandList = message.split("\\s+");
switch (commandList[0].toLowerCase()) {
case "register":
if (commandList.length > 1) {
this.id = commandList[1];
replyMessage = "Registered on server as " + this.id;
server.broadcast(this.id + " has connected", this);
} else {
replyMessage = "Incorrect register message";
}
break;
default:
replyMessage = "echo " + message;
break;
}
server.log("Sending to " + this.id + "(" + this.address + "): " + replyMessage);
try {
session.getRemote().sendString(replyMessage);
} catch (IOException e) {
server.log("Error during reply in onMessage for " + this.id + "(" + this.address + "): " + e.getMessage());
}
}
}
为了完整起见,我粘贴了整个类,尽管我删除了 onMessage 开关中的一些案例。然而,需要注意的部分是 onConnect 和 onClose 函数,它们将从服务器的客户端数组中填充和删除客户端。
错误日志
[2014-04-17 17:40:17.961] Sending to all clients: disconnect testclient4
2014-04-17 17:40:17.962:WARN:ClientSocket:qtp29398564-17: Unhandled Error (closing connection)
org.eclipse.jetty.websocket.api.WebSocketException: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:99)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
Caused by:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
Caused by:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
at java.util.ArrayList$Itr.next(ArrayList.java:831)
at TestServer.broadcast(TestServer.java:61)
at TestServer.removeClient(TestServer.java:45)
at ClientSocket.onClose(ClientSocket.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71)
at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:68)
at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onClose(JettyAnnotatedEventDriver.java:122)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:125)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:302)
at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:163)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:92)
at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:66)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:210)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:219)
at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:257)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.read(AbstractWebSocketConnection.java:500)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:409)
at org.eclipse.jetty.io.AbstractConnection$1.run(AbstractConnection.java:505)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:607)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:536)
at java.lang.Thread.run(Thread.java:744)
[2014-04-17 17:40:17.97] testclient7(94.246.80.30) error: Cannot call method public void ClientSocket#onClose(org.eclipse.jetty.websocket.api.Session, int, java.lang.String) with args: [org.eclipse.jetty.websocket.common.WebSocketSession, java.lang.Integer, <null>]
当同时断开所有客户端时,有时会发生这种情况。我很确定这与 ClientSocket 对象在广播的同时消失有关。
【问题讨论】:
标签: java asynchronous websocket jetty