有一个专门用于此目的的框架,称为Dexecutor,使用 Dexecutor,您可以根据图形对您的需求进行建模,在执行时,dexecutor 将负责以可靠的方式执行它。
例如:
@Test
public void testDependentTaskExecution() {
ExecutorService executorService = newExecutor();
ExecutionEngine<Integer, Integer> executionEngine = new DefaultExecutionEngine<>(executorService);
try {
DefaultDependentTasksExecutor<Integer, Integer> executor = new DefaultDependentTasksExecutor<Integer, Integer>(
executionEngine, new SleepyTaskProvider());
executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);
executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);
executor.execute(ExecutionConfig.NON_TERMINATING);
Collection<Node<Integer, Integer>> processedNodesOrder = Deencapsulation.getField(executor, "processedNodes");
assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
assertThat(processedNodesOrder).size().isEqualTo(14);
} finally {
try {
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
result.add(new Node<Integer, Integer>(1));
result.add(new Node<Integer, Integer>(2));
result.add(new Node<Integer, Integer>(7));
result.add(new Node<Integer, Integer>(9));
result.add(new Node<Integer, Integer>(10));
result.add(new Node<Integer, Integer>(8));
result.add(new Node<Integer, Integer>(11));
result.add(new Node<Integer, Integer>(12));
result.add(new Node<Integer, Integer>(3));
result.add(new Node<Integer, Integer>(13));
result.add(new Node<Integer, Integer>(5));
result.add(new Node<Integer, Integer>(6));
result.add(new Node<Integer, Integer>(4));
result.add(new Node<Integer, Integer>(14));
return result;
}
private ExecutorService newExecutor() {
return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}
private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
public Task<Integer, Integer> provideTask(final Integer id) {
return new Task<Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer execute() {
if (id == 2) {
throw new IllegalArgumentException("Invalid task");
}
return id;
}
};
}
}
这是建模图
这意味着任务 #1 、12 和 11 将并行运行,一旦其中一个任务完成,它的依赖任务就会启动,例如,一旦任务#1 完成,它的依赖任务 #2 和 #3 就会启动