【问题标题】:CompletableFuture: Waiting for first one normally return?CompletableFuture:等待第一个正常返回?
【发布时间】:2016-02-28 01:08:12
【问题描述】:

我有一些CompletableFutures,我想并行运行它们,等待第一个返回正常

我知道我可以使用CompletableFuture.anyOf 等待第一个返回,但这将返回正常异常。我想忽略异常。

List<CompletableFuture<?>> futures = names.stream().map(
  (String name) ->
    CompletableFuture.supplyAsync(
      () ->
        // this calling may throw exceptions.
        new Task(name).run()
    )
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
    logger.info(any.get().toString());
} catch (Exception e) {
    e.printStackTrace();
}

【问题讨论】:

    标签: java java-8 completable-future


    【解决方案1】:

    您可以使用以下辅助方法:

    public static <T>
        CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
    
        CompletableFuture<T> f=new CompletableFuture<>();
        Consumer<T> complete=f::complete;
        l.forEach(s -> s.thenAccept(complete));
        return f;
    }
    

    您可以像这样使用它,以证明它将忽略早期的异常但返回第一个提供的值:

    List<CompletableFuture<String>> futures = Arrays.asList(
        CompletableFuture.supplyAsync(
            () -> { throw new RuntimeException("failing immediately"); }
        ),
        CompletableFuture.supplyAsync(
            () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
              return "with 5s delay";
            }),
        CompletableFuture.supplyAsync(
            () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
              return "with 10s delay";
            })
    );
    CompletableFuture<String> c = anyOf(futures);
    logger.info(c.join());
    

    此解决方案的一个缺点是,如果 all 期货异常完成,它将永远完成。如果计算成功则提供第一个值但如果根本没有成功计算则异常失败的解决方案涉及更多:

    public static <T>
        CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
    
        CompletableFuture<T> f=new CompletableFuture<>();
        Consumer<T> complete=f::complete;
        CompletableFuture.allOf(
            l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
        ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
        return f;
    }
    

    它利用了 allOf 的异常处理程序仅在所有期货都完成后(无论是否例外)才被调用的事实,并且未来只能完成一次(抛开像 obtrude… 这样的特殊事物)。当异常处理程序被执行时,任何试图以结果完成未来的尝试都已经完成(如果有的话),因此只有在之前没有成功完成的情况下,尝试异常完成它才会成功。

    它可以与第一个解决方案完全相同的方式使用,并且只有在所有计算失败时才会表现出不同的行为,例如:

    List<CompletableFuture<String>> futures = Arrays.asList(
        CompletableFuture.supplyAsync(
            () -> { throw new RuntimeException("failing immediately"); }
        ),
        CompletableFuture.supplyAsync(
            // delayed to demonstrate that the solution will wait for all completions
            // to ensure it doesn't miss a possible successful computation
            () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
                throw new RuntimeException("failing later"); }
        )
    );
    CompletableFuture<String> c = anyOf(futures);
    try { logger.info(c.join()); }
    catch(CompletionException ex) { logger.severe(ex.toString()); }
    

    上面的示例使用延迟来演示解决方案将在没有成功时等待所有完成,而this example on ideone 将演示稍后的成功如何将结果变为成功。请注意,由于 Ideones 缓存结果,您可能不会注意到延迟。

    请注意,如果所有期货都失败,则无法保证会报告哪些异常。由于它在错误情况下等待所有完成,因此任何人都可以到达最终结果。

    【讨论】:

    【解决方案2】:

    考虑到:

    1. Java 哲学的基础之一是防止或阻止不良编程实践。

      (它在多大程度上成功做到了这一点是另一个争论的主题;重点仍然是,这无疑是该语言的主要目标之一。)

    2. 忽略异常是一种非常糟糕的做法。

      异常应该总是重新抛出到上面的层,或者处理,或者至少报告。具体来说, 异常应该永远不会被默默吞下。

    3. 应尽早报告错误。

      例如,看看运行时为了提供 fail fast 迭代器所经历的痛苦,如果集合在迭代时被修改,则会抛出 ConcurrentModificationException

    4. 忽略异常完成的 CompletableFuture 意味着 a) 您没有尽早报告错误,并且 b) 您很可能计划根本不报告错误。

    5. 不能简单地等待第一个非异常完成而不得不被异常完成所困扰,这不会带来任何重大负担,因为您总是可以从列表中删除异常完成的项目,(同时不要忘记报告失败,对吗?)然后重复等待。

    因此,如果 Java 中有意缺少所寻求的功能,我不会感到惊讶,并且我愿意争辩说它正确地地丢失了。 p>

    (抱歉,Sotirios,没有规范的答案。)

    【讨论】:

    • 考虑备用信息源(例如热插拔备份或负载平衡集群)。如果来源是可互换的,已知偶尔会失败,并且需要大量时间来响应,那么忽略一些错误是完全合法且可取的。
    • @Basilevs true,但最好还是记录它们并忽略日志消息。没有任何记录的任何类型的失败都不是一个好主意。
    【解决方案3】:

    嗯,这是框架应该支持的方法。首先,我认为CompletionStage.applyToEither 做了类似的事情,但事实证明它没有。所以我想出了这个解决方案:

    public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
      final int count = stages.size();
      if (count <= 0) {
        throw new IllegalArgumentException("stages must not be empty");
      }
      final AtomicInteger settled = new AtomicInteger();
      final CompletableFuture<U> future = new CompletableFuture<U>();
      BiConsumer<U, Throwable> consumer = (val, exc) -> {
        if (exc == null) {
          future.complete(val);
        } else {
          if (settled.incrementAndGet() >= count) {
            // Complete with the last exception. You can aggregate all the exceptions if you wish.
            future.completeExceptionally(exc);
          }
        }
      };
      for (CompletionStage<U> item : stages) {
        item.whenComplete(consumer);
      }
      return future;
    }
    

    要查看它的实际效果,这里有一些用法:

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.function.BiConsumer;
    
    public class Main {
      public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
        final int count = stages.size();
        if (count <= 0) {
          throw new IllegalArgumentException("stages must not be empty");
        }
        final AtomicInteger settled = new AtomicInteger();
        final CompletableFuture<U> future = new CompletableFuture<U>();
        BiConsumer<U, Throwable> consumer = (val, exc) -> {
          if (exc == null) {
            future.complete(val);
          } else {
            if (settled.incrementAndGet() >= count) {
              // Complete with the last exception. You can aggregate all the exceptions if you wish.
              future.completeExceptionally(exc);
            }
          }
        };
        for (CompletionStage<U> item : stages) {
          item.whenComplete(consumer);
        }
        return future;
      }
    
      private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
    
      public static <U> CompletionStage<U> delayed(final U value, long delay) {
        CompletableFuture<U> future = new CompletableFuture<U>();
        worker.schedule(() -> {
          future.complete(value);
        }, delay, TimeUnit.MILLISECONDS);
        return future;
      }
      public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
        CompletableFuture<U> future = new CompletableFuture<U>();
        worker.schedule(() -> {
          future.completeExceptionally(value);
        }, delay, TimeUnit.MILLISECONDS);
        return future;
      }
    
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("Started...");
    
        /*
        // Looks like applyToEither doesn't work as expected
        CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
        CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
        System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
        */
    
        try {
          List<CompletionStage<Integer>> futures = new ArrayList<>();
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
          futures.add(delayed(1, 1000));
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
          futures.add(delayed(2, 500));
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
          Integer value = firstCompleted(futures).toCompletableFuture().get();
          System.out.println("Completed normally: " + value);
        } catch (Exception ex) {
          System.out.println("Completed exceptionally");
          ex.printStackTrace();
        }
    
        try {
          List<CompletionStage<Integer>> futures = new ArrayList<>();
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
          futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
          Integer value = firstCompleted(futures).toCompletableFuture().get();
          System.out.println("Completed normally: " + value);
        } catch (Exception ex) {
          System.out.println("Completed exceptionally");
          ex.printStackTrace();
        }
    
        System.out.println("End...");
      }
    
    }
    

    【讨论】:

      【解决方案4】:

      对上面的代码做了一些修改,允许测试第一个结果是否符合预期。

      public class MyTask implements Callable<String> {
      
          @Override
          public String call() throws Exception {
              int randomNum = ThreadLocalRandom.current().nextInt(5, 20 + 1);
              for (int i = 0; i < randomNum; i++) {
                  TimeUnit.SECONDS.sleep(1);
              }
              return "MyTest" + randomNum;
          }
      }
      
      
      public class CompletableFutureUtils {
      
          private static <T> T resolve(FutureTask<T> futureTask) {
              try {
                  futureTask.run();
                  return futureTask.get();
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
          }
      
          private static <V> boolean predicate(Predicate<V> predicate, V v) {
              try {
                  return predicate.test(v);
              } catch (Exception e) {
                  return false;
              }
          }
      
          public static <T> void cancel(List<FutureTask<T>> futureTasks) {
              if (futureTasks != null && futureTasks.isEmpty() == false) {
                  futureTasks.stream().filter(f -> f.isDone() == false).forEach(f -> f.cancel(true));
              }
          }
      
          public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate) {
              return supplyAsync(futureTasks, predicate, null);
          }
      
          public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate,
                  Executor executor) {
              final int count = futureTasks.size();
              final AtomicInteger settled = new AtomicInteger();
              final CompletableFuture<V> result = new CompletableFuture<V>();
              final BiConsumer<V, Throwable> action = (value, ex) -> {
                  settled.incrementAndGet();
                  if (result.isDone() == false) {
                      if (ex == null) {
                          if (predicate(predicate, value)) {
                              result.complete(value);
                              cancel(futureTasks);
                          } else if (settled.get() >= count) {
                              result.complete(null);
                          }
                      } else if (settled.get() >= count) {
                          result.completeExceptionally(ex);
                      }
                  }
              };
              for (FutureTask<V> futureTask : futureTasks) {
                  if (executor != null) {
                      CompletableFuture.supplyAsync(() -> resolve(futureTask), executor).whenCompleteAsync(action, executor);
                  } else {
                      CompletableFuture.supplyAsync(() -> resolve(futureTask)).whenCompleteAsync(action);
                  }
              }
              return result;
          }
      }
      
      public class DemoApplication {
          public static void main(String[] args) {
              List<FutureTask<String>> tasks = new ArrayList<FutureTask<String>>();
              for (int i = 0; i < 2; i++) {
                  FutureTask<String> task = new FutureTask<String>(new MyTask());
                  tasks.add(task);
              }
              Predicate<String> test = (s) -> true;
              CompletableFuture<String> result = CompletableFutureUtils.supplyAsync(tasks, test);
              try {
                  String s = result.get(20, TimeUnit.SECONDS);
                  System.out.println("result=" + s);
              } catch (Exception e) {
                  e.printStackTrace();
                  CompletableFutureUtils.cancel(tasks);
              }
          }
      }
      

      调用CompletableFutureUtils.cancel(tasks);非常重要,所以当超时发生时,它会取消后台任务。

      【讨论】:

        【解决方案5】:

        我发现 Vertx - CompositeFuture.any 方法在这种情况下非常有用。它专为完全相同的情况而设计。当然你必须用户vertx定义Future。 Vertx CompositeFuture API Docs

        【讨论】:

          猜你喜欢
          • 2023-03-14
          • 2017-02-10
          • 2019-05-14
          • 2018-11-08
          • 2021-07-01
          • 1970-01-01
          • 1970-01-01
          • 2018-12-12
          • 1970-01-01
          相关资源
          最近更新 更多