【问题标题】:How to interrupt Callable threads that are blocked inside a loop?如何中断循环内阻塞的可调用线程?
【发布时间】:2014-10-26 01:30:21
【问题描述】:

这是我第一次用 Java 创建一个多线程应用程序,该应用程序将持续运行直到被取消,我在关闭/中断我的线程时遇到了麻烦。

我有一些线程与 Mediator 通信,该 Mediator 封装了 TransferQueue、ExecutorService,并促进生产线程和消费线程之间的通信。

我使用这个 Mediator 而不是 Future,因为 TransferQueue 在处理多个生产者的单个消费者时对块更友好(生产者线程可以随时 mediator.put(E e) 和消费者线程可以等待 E e = mediator.take() 获得可用的东西),我不想浪费 CPU 周期轮询。

该设计非常简洁、快速且有效,但我在中断 queue.take()、serverSocket.accept() 上的阻塞以及中断整个线程时遇到了麻烦。


制作人:

public class SocketProducer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);

        while (runnable) {
            Socket socket = serverSocket.accept(); // blocks until connection

            mediator.putIntoQueue(socket);
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        serverSocket.close();
        // ?
    }
}

和消费者:

private class SocketConsumer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private volatile boolean runnable = true;

    public SomeConsumer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Void call() throws Exception {
        while (runnable) {
            Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        // ?
    }
}

Colleague 接口只是扩展了 Callable,为 Mediator 提供了一些额外的能力来管理其生产者/消费者同事(即:调用:每个 colleague.interrupt())。

我尝试了很多方法,在各个地方抛出InterruptedException,在各个地方捕获InterruptedException,让线程将其Thread的实例返回给中介进行中断。我尝试过的一切都非常无效,以至于我感觉我错过了这个难题的关键部分。

到目前为止,我见过的最有效的方法是毒丸(如果队列没有在空插入时抛出 NPE,那就太好了),我尝试过的所有引入毒药泛型的方法都有由于 ClassCastException 失败(尝试将 Object 转换为 Socket,尝试实例化通用 Socket 等)。

我真的不知道从这里去哪里。我真的很希望能够按需干净地终止这些线程。


已完成的解决方案:

public class SocketProducer implements Colleague<Socket> {
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);
        logger.info("Listening on port " + listeningPort);

        while (runnable) {
            try {
                Socket socket = serverSocket.accept();
                logger.info("Connected on port " + socket.getLocalPort());
                mediator.putIntoQueue(socket);
            } catch (SocketException e) {
                logger.info("Stopped listening on port " + listeningPort);
            }
        }

        return null;
    }

    public void interrupt() {
        try {
            runnable = false;
            serverSocket.close();
        } catch (IOException e) {
            logger.error(e);
        }
    }

}

public class SocketConsumer implements Colleague<Socket> {
    private static final Logger logger = getLogger(SocketConsumer.class.getName());
    private Mediator<Socket> socketMediator;

    public SocketConsumer(Mediator<Socket> mediator) {
        this.socketMediator = mediator;
    }

    public Void call() throws Exception {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Socket socket = socketMediator.takeFromQueue();
                logger.info("Received socket on port: " + socket.getLocalPort());
            } catch (InterruptedException e) {
                logger.info("Interrupted.");
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    public void interrupt() {
        Thread.currentThread().interrupt();
    }

}

【问题讨论】:

  • 我现在可以看到的第一个明显的事情是您在runnable 字段旁边缺少volatile。由于您试图从另一个线程中断一个线程,因此您需要确保字段更改的可见性。我也会说得很清楚boolean,而不是Boolean
  • 也许我可以将 Socket 和 ServerSocket 包装到它们自己的类中,这样我就可以为它们创建独特的毒丸实例?
  • 如果你调用ServerSocket#close(),那么阻塞的accept()调用会抛出一个异常......这有帮助吗?
  • 终止整个JVM不是一个选项?

标签: java multithreading sockets


【解决方案1】:

我认为毒丸只会让事情变得更复杂,所以我会保持简单。

至于ServerSocketthis answer建议调用close()应该足以打断它。

至于BlockingQueue,消费者可以是这样的:

// you can use volatile flag instead if you like
while (!Thread.currentThread.isInterrupted()) {
    try {
        Object item = queue.take();
        // do something with item 
    } catch (InterruptedException e) {
        log.error("Consumer interrupted", e);
        Thread.currentThread().interrupt(); // restore flag
    }
}

然后在您的Mediator 中,您可以在消费者线程上调用interrupt()

【讨论】:

  • 关闭ServerSocket 不会产生任何IMO,因为TranferQueue.transfer(E)-method 似乎是阻塞序列。关闭套接字只会关闭套接字将提供的流,但不会从队列中删除套接字。因此,要么需要中断调解器(队列),要么可以使用轮询/阻塞超时方法代替阻塞语句。
  • @RomanVottner 实际上producer中有两个阻塞操作(socket和queue),都需要处理。
  • 我发誓我以前尝试过这个,但突然之间它工作得非常好。我想我希望找到一种适用于这两种情况的解决方案,但幸运的是我可以解决它。消费者干净利落地中断,但我必须使用可运行标志并关闭生产者上的套接字。它完全忽略了线程中断。
  • @AndrewLogvinov 啊,我错过了serverSocket.accept() 的阻塞,所以你当然是对的:)
【解决方案2】:

毒丸是直截了当的。

private static Socket poisonPill = new Socket();

public Void call() throws Exception {
    while (runnable) {
        Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        if (socket == poisonPill) { 
            // quit the thread... 
        }
    }

    return null;
}

注意socket == poisonPill。这是一个相等性检查,它们是完全相同的实例,因此毒药就是这样工作的,但仍然是类型安全的。

【讨论】:

    猜你喜欢
    • 2013-07-07
    • 1970-01-01
    • 2013-11-16
    • 1970-01-01
    • 1970-01-01
    • 2013-07-23
    • 2014-07-01
    • 2013-10-05
    • 2012-12-30
    相关资源
    最近更新 更多