【发布时间】:2021-01-10 18:08:52
【问题描述】:
当 Callable 返回的结果与条件匹配时,有没有办法停止 Java ExecuterService?
我有以下算法(代码 A):
MyObject functionA(SomeObject someObject) {
for(int i = 0; i < 100; ++i) {
MyObject result = someFunction(i, someObject);
if (result != null) {
return result;
}
}
return null;
}
为了加快速度,我将代码更改为(代码 B):
MyObject functionB(SomeObject someObject) {
final ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
return someFunction(finalI, someObject);
};
callableList.add(newCallable);
}
List<Future<MyObject>> futures = executorService.invokeAll(callableList);
for(int i = 0; i < futures.size(); ++i) {
Future<MyObject> future = futures.get(i);
if(future.get() != null) {
return future.get();
}
}
return null;
}
平均而言,函数 B 的运行速度是函数 A 的 5 倍(在 8 个内核上)。
这很好,但并不完美。在 functionA 中, someFunction() 在找到结果 != null 之前平均被调用了大约 20 次。在 functionB 中 someFunction() 总是被调用 100 次。
有没有办法
a) 停止 executorService,当第一个线程以结果 != null 结束时
或更好
b) 停止 executorService,当一个线程完成并且一个线程找到结果 != null 并且所有线程的 finalI 值低于找到结果 != null 的线程,都以 reuslt == null 完成
谢谢
------- 编辑 ---------
感谢 Alex Crazy 和 Saxon 的回答,我能够回答我的问题 A。
- 结果不符合我的条件的所有 Callables 都应该抛出异常
- 我应该使用 invokeAny 而不是 invokeAll
- 我应该使用 newFixedThreadPool 而不是 newCachedThreadPool
FixedThreadPool 的使用很重要,因为 CachedThreadPool 并行运行所有 100 个任务。 FixedThreadPool 仅运行您定义的并行任务。新任务只有在旧任务完成后才会开始。
如果您不检查您的可调用对象是否被中断(因为 invokeAny 找到了结果),那么所有可调用对象都将运行直到它们完成。 因此,您可以使用 invokeAny 更快地获得结果,但处理器的工作时间与使用 invokeAll 时一样长。
所以我的新函数如下所示:
MyObject functionC(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
MyObject result = someFunction(finalI, someObject);
if(result == null) {
throw new Exception();
}
return result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
return result;
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
functionC 解决了我的问题 A,但它并不总是返回由 someFunction() 创建的 MyObject 对象,该对象具有 finalI 的最低输入值。
例如。 someFunction(3, someObject) 和 someFunction(6, someObject) 将返回一个不为空的 MyObject 对象。
在某些运行中,将返回 someFunction(3, someObject) 的结果,而在其他运行中,将返回 someFunction(6, someObject) 的结果。
它不是确定性的。
所以我写了新的类 ResultCallable
public abstract class ResultCallable<E> implements Callable<E> {
E result = null;
}
和功能D
MyObject functionD(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
ResultCallable<MyObject> newCallable = () -> {
this.result = someFunction(finalI, someObject);
if(this.result == null) {
throw new Exception();
}
return this.result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.SECONDS);
for(int i = 0; i < callableList.size(); ++i) {
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
} catch (InterruptedException | ExecutionException e) {
return null;
}
return null;
}
如果someFunction(3, someObject) 和someFunction(6, someObject) 将返回一个不为空的 MyObject 对象,现在主线程在第一个有效结果后等待所有其他启动的任务完成(使用 newFixedThreadPool(8) 最多有 7 个其他正在运行的任务)。
因为 executorService 按照它们在 callableList 中的顺序运行可调用对象,所以所有 finalI 值低于返回第一个有效结果的可调用对象的可调用对象都已启动,并将在第 19 行之前完成。
这很好,但并不完美。
假设finalI = 6 的任务首先完成,而finalI 从0 到5 和7 的任务仍在运行。
然后因为 executorService.shutdownNow();没有其他任务将启动,主线程将等待所有正在运行的任务完成。
但是我只需要等待finalI在0到3之间的任务完成即可。任务 3 因为它也有一个结果 != null 和任务 0、1 和 2 来证明任务 3 下面没有结果。
我可以将 executorService.awaitTermination 和 for 循环替换为以下内容吗?
for(int i = 0; i < callableList.size(); ++i) {
awaitTermination(callableList.get(i));
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
awaitTermination(callableList.get(i)); 是伪代码。我找不到要等到可调用对象执行的函数。
----------- 编辑 2 ---------- --
我自己解决了。
我将 final CountDownLatch countDownLatch = new CountDownLatch(1); 添加到我的类 ResultCallable。
在this.result = someFunction(finalI, someObject); 之后我添加了this.countDownLatch.countDown();
而awaitTermination(callableList.get(i)); 被callableList.get(i).countDownLatch.await(); 取代
如果你有更好的想法,请告诉我。
【问题讨论】:
标签: java multithreading parallel-processing executorservice