【问题标题】:How to handle joining multiple asynchronous CompletableFutures?如何处理加入多个异步 CompletableFutures?
【发布时间】:2016-12-10 07:23:59
【问题描述】:

我听说CompletableFuture 能够将多个期货与runAfterBoth 合并,但如果我想合并两个以上的期货怎么办?

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

// Could be any number
for (CompletableFuture<Boolean> f : list) {
   f.runAfter..
}

我的用例是我将消息发送到多个套接字以定位单个对象,该对象可能在其中任何一个上也可能不在。

我目前正在将其视为解决方案:

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

CompletableFuture<Boolean> result = new CompletableFuture<>();

Thread accept = new Thread(() -> {
   for (CompletableFuture<Boolean> f : list)
      if (f.join() != null)
         result.complete(f.join());
});

accept.start();

// Actual boolean value returned
result.get();

但这有点乱。就我而言,我想在得到有效结果(非空)后立即继续处理,而不是等待无效结果。

例如,a 需要 5 秒,即使 b 已经在 2 秒内完成,循环也在等待它;但循环不知道这一点,因为它仍在等待a

是否有一种模式可用于加入多个异步期货,我可以在成功完成后立即做出响应?

另一种可能性:

public static class FutureUtil {
public static <T> CompletableFuture<T> anyOfNot(
   Collection<CompletableFuture<T>> collection,
   T value,
   T defaultValue)
{
   CompletableFuture<T> result = new CompletableFuture<>();

   new Thread(() -> {
      for (CompletableFuture<T> f : collection) {
         f.thenAccept((
            T r) -> {
            if ((r != null && !r.equals(value))
               || (value != null && !value.equals(r)))
               result.complete(r);
         });
      }

      try {
         for (CompletableFuture<T> f : collection)
            f.get();
      }
      catch (Exception ex) {
         result.completeExceptionally(ex);
      }

      result.complete(defaultValue);
   }).start();

   return result;
}
}

使用示例:

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

CompletableFuture<Boolean> result = FutureUtil.anyOfNot(list, null, false);

result.get();

【问题讨论】:

  • 您在寻找CompletableFuture.allOf()吗?
  • 有点。如果其中一个已经以有效结果(非空)完成,我不想等待其他期货。
  • 那么也许anyOf ?
  • 如果布尔值返回 null,anyOf 将完成。我只想要返回非空结果的未来。
  • 类似问题有有用的答案here

标签: java asynchronous completable-future


【解决方案1】:

如果您知道 List 中的至少一个 CF 将以非空值完成,您可以试试这个:

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures) {

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>();
    completableFutures.forEach(cf -> cf.thenAccept(v -> {
        if (v != null) {
            completableFutureResult.complete(v);
        }
    }));
    return completableFutureResult;
}

如果不能保证至少有一个 CF 将返回非空值,则需要更复杂的东西:

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures, T defaultValue) {

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>();
    completableFutures.forEach(cf -> cf.thenAccept(v -> {
        if (v != null) {
            completableFutureResult.complete(v);
        }
    }));
    //handling the situation where all the CFs returned null 
    CompletableFuture<Void> allCompleted = CompletableFuture
        .allOf((CompletableFuture<?>[]) completableFutures.toArray());
    allCompleted.thenRun(() -> {
        //checking first if any of the completed delivered a non-null value, to avoid race conditions with the block above 
        completableFutures.forEach(cf -> {
            final T result = cf.join();
            if (result != null) {
                completableFutureResult.complete(result);
            }
        });
        //if still not completed, completing with default value
        if ( !completableFutureResult.isDone()) {
            completableFutureResult.complete(defaultValue);
        }
    });
    return completableFutureResult;
}

【讨论】:

  • 感谢您的回答。但这与我问题末尾的代码示例几乎相同,但没有异常处理和默认值。请注意,您无需在已完成的未来上检查 isDone()
  • 在您的解决方案中,您正在创建一个不必要的Thread,并阻止它。我已经向您展示了您不需要额外的线程来完成相同的工作。 请注意,您不需要检查 isDone() 已经完成的未来:嗯,您无法知道它是否在那个阶段完成。可能是allOf 块在thenAccept 之前执行。您对异常处理是正确的,但这不是您问题的一部分。
  • 尝试运行您的代码时出现以下异常。你确定吗? java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.util.concurrent.CompletableFuture
  • 哪一行?可以发个例子吗?
  • (CompletableFuture&lt;?&gt;[]) completableFutures.toArray()
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-02-22
  • 1970-01-01
  • 2017-02-03
  • 1970-01-01
  • 2016-05-25
  • 1970-01-01
  • 2011-03-18
相关资源
最近更新 更多