这应该由finishAllQuery 自己处理。例如,从 Java 9 开始,您可以使用
public static <T> CompletableFuture<List<T>> finishAllQuery(
List<CompletableFuture<T>> futures, long timeOut, TimeUnit unit) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.completeOnTimeout(null, timeOut, unit)
.thenApply(v -> futures.stream()
.filter(CompletableFuture::isDone)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
使用completeOnTimeout,我们可以在超时后使用预定义的值强制完成未来。这里我们只使用null,因为allOf的结果值无论如何都无所谓。
我们只需要添加一个过滤条件,跳过所有尚未完成的futures,否则join会阻塞线程。
这可以像这样使用
CompletableFuture<List<Result>> allResponse
= finishAllQuery(futures, 5, TimeUnit.MILLISECONDS);
List<Result> list = allResponse.join(); // will wait at most 5 milliseconds
对于 Java 8,我们可以使用
static <T> CompletableFuture<T> completeOnTimeout(
CompletableFuture<T> cf, T value, long timeOut, TimeUnit unit) {
ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<Boolean> job = e.schedule(() -> cf.complete(value), timeOut, unit);
return cf.whenComplete((x,y) -> { job.cancel(false); e.shutdown(); });
}
对于缺少的功能,需要稍微重写:
public static <T> CompletableFuture<List<T>> finishAllQuery(
List<CompletableFuture<T>> futures, long timeOut, TimeUnit unit) {
return completeOnTimeout(
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])),
null, timeOut, unit)
.thenApply(v -> futures.stream()
.filter(CompletableFuture::isDone)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
调用者使用方法的方式,不会改变。
对于生产用途,值得重写 completeOnTimeout 方法以重用 ScheduledExecutorService,但这也需要添加关闭代码或创建守护线程的线程工厂。使用 Java 9 或更高版本,您可以免费获得。