我已经写了很多有关InterruptedException和中断线程的文章 简而言之,如果您没有Future.cancel()调用Future.cancel()那么Future将终止待处理的get() ,但还将尝试中断基础线程。 这是一个非常重要的功能,可以更好地利用线程池。 我还写信总是比标准Future更喜欢CompletableFuture 事实证明,功能更强大的Future弟兄没有那么优雅地处理cancel() 考虑以下任务,我们稍后将在整个测试中使用以下任务:

class InterruptibleTask implements Runnable {
 
    private final CountDownLatch started = new CountDownLatch(1)
    private final CountDownLatch interrupted = new CountDownLatch(1)
 
    @Override
    void run() {
        started.countDown()
        try {
            Thread.sleep(10_000)
        } catch (InterruptedException ignored) {
            interrupted.countDown()
        }
    }
 
    void blockUntilStarted() {
        started.await()
    }
 
    void blockUntilInterrupted() {
        assert interrupted.await(1, TimeUnit.SECONDS)
    }
 
}

客户端线程可以检查InterruptibleTask以查看其是否已开始或被中断。 首先,让我们看一下InterruptibleTask对外部的cancel()反应:

def "Future is cancelled without exception"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}
 
def "CompletableFuture is cancelled via CancellationException"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}

到目前为止,一切都很好。 显然, FutureCompletableFuture工作方式几乎相同-在取消结果后检索结果会引发CancellationException 但是myThreadPool线程呢? 我以为它会被游泳池打断,从而被回收,我怎么了!

def "should cancel Future"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}
 
@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}

第一个测试将普通的Runnable提交给ExecutorService并等待其启动。 稍后,我们取消Future并等待直到观察到InterruptedException 当基础线程被中断时, blockUntilInterrupted()将返回。 但是,第二次测试失败。 CompletableFuture.cancel()永远不会中断基础线程,因此尽管Future看起来好像已被取消,但后备线程仍在运行,并且sleep()不会抛出InterruptedException 错误或功能? 它已记录在案 ,因此很遗憾地提供一个功能:

参数: mayInterruptIfRunning –此值在此实现中无效,因为不使用中断来控制处理。

您说RTFM,但是为什么CompletableFuture这样工作? 首先,让我们研究“旧的” Future实现与CompletableFuture有何不同。 ExecutorService.submit()返回的FutureTask具有以下cancel()实现(我使用类似的非线程安全Java代码删除了Unsafe ,因此仅将其视为伪代码):

public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                state = INTERRUPTED;
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

FutureTask具有一个遵循该状态图的state变量:

CompletableFuture不能被打断

在的情况下, cancel()我们可以进入CANCELLED状态或去INTERRUPTED通过INTERRUPTING 核心部分是我们拿runner线程(如果存在的,也就是说,如果当前正在执行的任务),我们尽量打断它。 该分支负责急切和强制中断已运行的线程。 最后,我们必须通知阻塞的所有线程Future.get()finishCompletion()这里无关紧要)。 因此,很明显,多大的Future取消已经运行的任务。 CompletableFuture呢? cancel()伪代码:

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = false;
    if (result == null) {
        result = new AltResult(new CancellationException());
        cancelled = true;
    }
    postComplete();
    return cancelled || isCancelled();
}

令人失望的是,我们几乎没有将result设置为CancellationException ,而忽略了mayInterruptIfRunning标志。 postComplete()也有类似的作用finishCompletion() -在通知未来注册的所有悬而未决的回调。 它的实现相当令人不快(使用非阻塞式Treiber stack ),但是它绝对不会中断任何底层线程。

原因和含义

CompletableFuture情况下,有限的cancel()不是错误,而是设计决定。 CompletableFuture并非固有地绑定到任何线程,而Future几乎总是代表后台任务。 从零开始创建CompletableFuturenew CompletableFuture<>() )是完美的,其中根本没有要取消的底层线程。 我仍然不禁感到大多数CompletableFuture 都将具有关联的任务和后台线程。 在这种情况下, cancel()可能会出现故障。 我不再建议用CompletableFuture盲目地替换Future ,因为它可能会更改依赖cancel()的应用程序的行为。 这意味着CompletableFuture故意违反了Liskov替换原则 -这是需要考虑的严重问题。

翻译自: https://www.javacodegeeks.com/2015/03/completablefuture-cant-be-interrupted.html

相关文章: