【发布时间】:2015-12-01 18:22:03
【问题描述】:
我正在研究 CompletionService 类,我发现将提交队列与完成队列解耦非常有用。
但我也错过了一种轮询/取消任务的方法(在某种程度上可以被认为已完成)。是否可以轻松完成?
Future<String> task1Future = completionService.submit(myCallableTask1);
Future<String> task2Future = completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take();
//Seems to return only the completed or
//interrupted tasks, not those cancelled (task2)
编辑:在检查了一些答案后,我意识到发生了什么。 CompletitionService 以与提交的作业相同的顺序返回。 如果您运行作业 a、b 和 c;在 a 工作时取消 b 和 c;最后轮询completionService,直到a taks 终止才会通知b 和c 的取消。 另外,我意识到,如果您关闭执行程序而不是取消单个任务,那么仍在队列中的那些任务不会到达完成服务,即使它们的取消已完成,而活动任务尚未结束。
EDIT2:好的,我添加了一个完整的测试用例
import java.util.concurrent.*;
public class CompletionTest {
public static void main(String[] args){
CompletionService<String> completion =
new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor());
Future<String> aFuture =
completion.submit(() -> {Thread.sleep(10000); return "A";});
completion.submit(() -> {return "B";}).cancel(true);
completion.submit(() -> {return "C";}).cancel(true);
completion.submit(() -> {return "D";}).cancel(true);
long arbitraryTime = System.currentTimeMillis();
while (true){
String result = getNextResult(completion);
System.out.println(result);
if (System.currentTimeMillis() > arbitraryTime+5000){
aFuture.cancel(true);
}
}
}
public static String getNextResult(CompletionService<String> service){
try {
String taskId = service.take().get();
return "'"+taskId+"' job completed successfully.";
}
catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; }
catch (CancellationException e) { return "Unknown job was cancelled."; }
catch (ExecutionException e) {
return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
}
}
}
我期望的输出如下:
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.
但实际上是:
'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
我什至尝试使用 PriorityBlockingQueue 作为 CompletionService 的队列,使用 Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed() 但它也没有工作(我猜如果元素改变状态它不会使用)
【问题讨论】:
-
您的代码对我来说可以正常工作并返回已取消的
Future。我可能已经做出了假设(例如,cancel需要boolean参数)。请发布 MCVE。 -
@Sotirios Delimanolis 但它是返回您的“中断”取消期货(即在底层任务活动时中断)还是“丢弃”取消期货(即在排队但尚未运行时取消)?如果是后者,那我的队列或线程池设置一定有问题
-
两者都有。
Future的isCancelled都返回true。 -
@Sotirios Delimanolis 请检查我的编辑,我认为我们所说的每个前提都有些道理
-
我无法重现 如果您运行作业 a、b 和 c;在 a 工作时取消 b 和 c;最后轮询completionService,直到a taks 终止才会通知b 和c 的取消。。请提供发生这种情况的 MCVE,并为您的另外,[...] 提供另一个 MCVE。
标签: java java-8 future java.util.concurrent completion-service