【发布时间】:2014-10-26 01:30:21
【问题描述】:
这是我第一次用 Java 创建一个多线程应用程序,该应用程序将持续运行直到被取消,我在关闭/中断我的线程时遇到了麻烦。
我有一些线程与 Mediator 通信,该 Mediator 封装了 TransferQueue、ExecutorService,并促进生产线程和消费线程之间的通信。
我使用这个 Mediator 而不是 Future,因为 TransferQueue 在处理多个生产者的单个消费者时对块更友好(生产者线程可以随时 mediator.put(E e) 和消费者线程可以等待 E e = mediator.take() 获得可用的东西),我不想浪费 CPU 周期轮询。
该设计非常简洁、快速且有效,但我在中断 queue.take()、serverSocket.accept() 上的阻塞以及中断整个线程时遇到了麻烦。
制作人:
public class SocketProducer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
while (runnable) {
Socket socket = serverSocket.accept(); // blocks until connection
mediator.putIntoQueue(socket);
}
return null;
}
public void interrupt() {
// ?
runnable = false;
serverSocket.close();
// ?
}
}
和消费者:
private class SocketConsumer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private volatile boolean runnable = true;
public SomeConsumer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Void call() throws Exception {
while (runnable) {
Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
}
return null;
}
public void interrupt() {
// ?
runnable = false;
// ?
}
}
Colleague 接口只是扩展了 Callable,为 Mediator 提供了一些额外的能力来管理其生产者/消费者同事(即:调用:每个 colleague.interrupt())。
我尝试了很多方法,在各个地方抛出InterruptedException,在各个地方捕获InterruptedException,让线程将其Thread的实例返回给中介进行中断。我尝试过的一切都非常无效,以至于我感觉我错过了这个难题的关键部分。
到目前为止,我见过的最有效的方法是毒丸(如果队列没有在空插入时抛出 NPE,那就太好了),我尝试过的所有引入毒药泛型的方法都有由于 ClassCastException 失败(尝试将 Object 转换为 Socket,尝试实例化通用 Socket 等)。
我真的不知道从这里去哪里。我真的很希望能够按需干净地终止这些线程。
已完成的解决方案:
public class SocketProducer implements Colleague<Socket> {
private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
logger.info("Listening on port " + listeningPort);
while (runnable) {
try {
Socket socket = serverSocket.accept();
logger.info("Connected on port " + socket.getLocalPort());
mediator.putIntoQueue(socket);
} catch (SocketException e) {
logger.info("Stopped listening on port " + listeningPort);
}
}
return null;
}
public void interrupt() {
try {
runnable = false;
serverSocket.close();
} catch (IOException e) {
logger.error(e);
}
}
}
public class SocketConsumer implements Colleague<Socket> {
private static final Logger logger = getLogger(SocketConsumer.class.getName());
private Mediator<Socket> socketMediator;
public SocketConsumer(Mediator<Socket> mediator) {
this.socketMediator = mediator;
}
public Void call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
Socket socket = socketMediator.takeFromQueue();
logger.info("Received socket on port: " + socket.getLocalPort());
} catch (InterruptedException e) {
logger.info("Interrupted.");
Thread.currentThread().interrupt();
}
}
return null;
}
public void interrupt() {
Thread.currentThread().interrupt();
}
}
【问题讨论】:
-
我现在可以看到的第一个明显的事情是您在
runnable字段旁边缺少volatile。由于您试图从另一个线程中断一个线程,因此您需要确保字段更改的可见性。我也会说得很清楚boolean,而不是Boolean。 -
也许我可以将 Socket 和 ServerSocket 包装到它们自己的类中,这样我就可以为它们创建独特的毒丸实例?
-
如果你调用
ServerSocket#close(),那么阻塞的accept()调用会抛出一个异常......这有帮助吗? -
终止整个JVM不是一个选项?
标签: java multithreading sockets