【发布时间】:2013-12-11 21:04:39
【问题描述】:
我尝试使用 CompletionService 并行执行多个任务。当我尝试实施取消时,问题就出现了。
这是我使用的代码草图:
void startTasks(int numberOfTasks) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
CompletionService<TaskResultType> completionService = new ExecutorCompletionService<TaskResultType>(executor);
ConcurrentLinkedQueue<TaskResultType> results = new ConcurrentLinkedQueue<BenchmarkResult>();
ArrayList<Future> futures = new ArrayList<Future>();
for (int i = 0; i < numberOfTasks ; i++) {
TypeOfTask task = ... ;
Future future = completionService.submit(task);
futures.add(future);
}
boolean failed = false;
Throwable cause = null;
for (int i = 0; i < numberOfThreads; i++) {
try {
Future<TaskResultType> resultFuture = completionService.take();
TaskResultType result = resultFuture.get();
results.add(result);
} catch (ExecutionException e) {
failed = true;
cause = e.getCause();
/* cancel all other running tasks in case of failure in one task */
for (Future future : futures) {
future.cancel(true);
}
} catch (CancellationException e) {
// consume (planned cancellation from calling future.cancel())
}
}
executor.shutdown();
// code to throw an exception using cause
}
任务实现Callable。
当我现在在大多数情况下在其中一个任务中抛出异常时,它运行良好,即我立即从其他任务中获取 CancellationExceptions,并且任务立即完成(让我们称之为案例 A)。但有时(我们称之为案例 B),一些任务先完成然后抛出 CancellationException。 Future.cancel(true) 在这两种情况下对所有任务都返回 true(除了具有初始 ExecutionException 的那个,因为这个已经被取消了)。
我使用 Thread.currentThread.isInterrupted() 检查中断标志,在完成的任务(即取消不成功的任务)中,中断标志设置为 false。
在我看来,这一切似乎都是非常奇怪的行为。有人知道问题可能是什么吗?
更新
到目前为止我最好的想法是,在包含任务的代码深处的某个地方(只有一些高级代码来自我自己),中断状态被消耗,例如通过捕获的 InterruptedException 不调用 Thread.interrupt() 来重新建立状态。由于线程的调度,Future.cancel() 设置中断标志的确切时间可能会略有不同,这可以解释不一致的行为。消费中断状态能否解释案例 B 的行为?
【问题讨论】:
-
如何在已经完成的作业中测试中断标志?如果它已经完成,那么没有代码正在运行,对吧?
-
没有看到您任务中的代码,我们帮不了您
-
我怀疑您的问题是其中一项任务正在执行 IO 或以其他方式被阻止并且未检查
Thread.currentThread.isInterrupted()。因此,即使您取消了Future并且线程被中断,也不会被检测到。你的程序挂起还是完成?如果它挂起,你可以做一个线程转储来查看哪些线程被挂起? -
@Gray 不,我正在检查,而任务仍在运行。
-
@Gray 两种情况下程序都结束
标签: java multithreading