【问题标题】:CompletableFuture return all finished result after timeoutCompletableFuture 在超时后返回所有完成的结果
【发布时间】:2019-06-11 22:22:03
【问题描述】:

我想要一份我想要等待的可完成期货的清单。使用以下代码。

  public static <T> CompletableFuture<List<T>> finishAllQuery(
  List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(
    v -> futures.stream().filter(Objects::nonNull)
        .map(future -> future.join())
        .collect(Collectors.toList())
);
  }
CompletableFuture<List<Result>> allResponse = finishAllQuery(futures);
allResponse.get(5, milliseconds)

问题是在所有的future中,有些可能很慢,我希望在过期时间之后,get方法返回所有完成的结果。有没有办法做到这一点?

非常感谢。

【问题讨论】:

    标签: java completable-future


    【解决方案1】:

    如果您不一定希望所有期货都完成,那么您不应该使用allOf()。您可以改为使用 CompletionService,然后使用带有超时的 poll() 方法。所以流程将是:

    1. 将您的 ExecutorService 包装在 CompletionService 中
    2. 向 CompletionService 提交 N 个任务。然后轮询具有超时的单个任务。比如:

    经过时间 = 0; 可用时间 = 5 毫秒; CompleteFutures = List();

    while (elapsedTime < availableTime):
        remainingTime = availableTime - elapsedTime; 
        startTime = currentTime();
        completedFutures.add(CompletionService.poll(timeout=remainingTime));
        elapsedTime += (currentTime() - startTime)
    

    【讨论】:

      【解决方案2】:

      这应该由finishAllQuery 自己处理。例如,从 Java 9 开始,您可以使用

      public static <T> CompletableFuture<List<T>> finishAllQuery(
          List<CompletableFuture<T>> futures, long timeOut, TimeUnit unit) {
      
          return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
              .completeOnTimeout(null, timeOut, unit)
              .thenApply(v -> futures.stream()
                  .filter(CompletableFuture::isDone)
                  .map(CompletableFuture::join)
                  .collect(Collectors.toList()));
      }
      

      使用completeOnTimeout,我们可以在超时后使用预定义的值强制完成未来。这里我们只使用null,因为allOf的结果值无论如何都无所谓。

      我们只需要添加一个过滤条件,跳过所有尚未完成的futures,否则join会阻塞线程。

      这可以像这样使用

      CompletableFuture<List<Result>> allResponse
          = finishAllQuery(futures, 5, TimeUnit.MILLISECONDS);
      List<Result> list = allResponse.join(); // will wait at most 5 milliseconds
      

      对于 Java 8,我们可以使用

      static <T> CompletableFuture<T> completeOnTimeout(
          CompletableFuture<T> cf, T value, long timeOut, TimeUnit unit) {
      
          ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
          ScheduledFuture<Boolean> job = e.schedule(() -> cf.complete(value), timeOut, unit);
          return cf.whenComplete((x,y) -> { job.cancel(false); e.shutdown(); });
      }
      

      对于缺少的功能,需要稍微重写:

      public static <T> CompletableFuture<List<T>> finishAllQuery(
          List<CompletableFuture<T>> futures, long timeOut, TimeUnit unit) {
          return completeOnTimeout(
              CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])),
              null, timeOut, unit)
              .thenApply(v -> futures.stream()
                  .filter(CompletableFuture::isDone)
                  .map(CompletableFuture::join)
                  .collect(Collectors.toList()));
      }
      

      调用者使用方法的方式,不会改变。

      对于生产用途,值得重写 completeOnTimeout 方法以重用 ScheduledExecutorService,但这也需要添加关闭代码或创建守护线程的线程工厂。使用 Java 9 或更高版本,您可以免费获得。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-07-09
        • 1970-01-01
        • 2019-05-14
        • 1970-01-01
        • 2012-08-19
        • 2023-03-22
        • 1970-01-01
        • 2020-10-08
        相关资源
        最近更新 更多