【问题标题】:Timeout with default value in Java 8 CompletableFutureJava 8 CompletableFuture 中的默认值超时
【发布时间】:2014-06-27 19:27:30
【问题描述】:

假设我有一些异步计算,例如:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .thenAccept(foo -> doStuffWithFoo(foo));

如果异步供应商根据某些指定的超时超时,是否有一种很好的方法可以为 foo 提供默认值?理想情况下,此类功能也会尝试取消运行缓慢的供应商。例如,是否有类似于以下假设代码的标准库功能:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .acceptEither(
                CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
                foo -> doStuffWithFoo(foo));

或者甚至更好:

CompletableFuture
        .supplyAsync(() -> createFoo())
        .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
        .thenAccept(foo -> doStuffWithFoo(foo));

我知道get(timeout, unit),但我想知道是否有更好的标准方法以异步和反应方式应用超时,如上面的代码中所建议的那样。

编辑:这是一个受Java 8: Mandatory checked exceptions handling in lambda expressions. Why mandatory, not optional? 启发的解决方案,但不幸的是它阻塞了一个线程。如果我们依赖 createFoo() 来异步检查超时并抛出它自己的超时异常,它会在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建异常的成本(这可以没有“快速投掷”就很贵)

static <T> Supplier<T> wrapped(Callable<T> callable) {
    return () -> {
        try {
            return callable.call();
        } catch (RuntimeException e1) {
            throw e1;
        } catch (Throwable e2) {
            throw new RuntimeException(e2);
        }
    };
}
CompletableFuture
        .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
        .exceptionally(e -> "default")
        .thenAcceptAsync(s -> doStuffWithFoo(foo));

【问题讨论】:

    标签: java java-8


    【解决方案1】:

    CompletableFuture.supplyAsync 只是一个帮助方法,它为您创建 CompletableFuture,并将任务提交到 ForkJoin Pool。

    您可以根据自己的要求创建自己的 supplyAsync,如下所示:

    private static final ScheduledExecutorService schedulerExecutor = 
                                     Executors.newScheduledThreadPool(10);
    private static final ExecutorService executorService = 
                                     Executors.newCachedThreadPool();
    
    
    public static <T> CompletableFuture<T> supplyAsync(
            final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
            T defaultValue) {
    
        final CompletableFuture<T> cf = new CompletableFuture<T>();
    
        // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a 
        // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
        // Using Executors.newCachedThreadPool instead in the example
        // submit task
        Future<?> future = executorService.submit(() -> {
            try {
                cf.complete(supplier.get());
            } catch (Throwable ex) {
                cf.completeExceptionally(ex);
            }
        });
    
        //schedule watcher
        schedulerExecutor.schedule(() -> {
            if (!cf.isDone()) {
                cf.complete(defaultValue);
                future.cancel(true);
            }
    
        }, timeoutValue, timeUnit);
    
        return cf;
    }
    

    使用该助手创建 CompletableFuture 就像使用 CompletableFuture 中的静态方法一样简单:

        CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
                TimeUnit.SECONDS, "default");
    

    测试它:

        a = supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e1) {
                // ignore
            }
            return "hi";
        }, 1, TimeUnit.SECONDS, "default");
    

    【讨论】:

    • 我想知道 Thread.sleep(2000);在你的例子中实际上被打断了。它不是。如果我将您的示例从 ForkJoinPool.commonPool().submit 更改为 Executors.newFixedThreadPool(1).submit,那么就是这样。我想知道为什么...
    • 你是对的@Peti! ForkJoinPool 中的 commonPool 提供了一个 ForkJoinTask 类型的 Future 实现,在取消的情况下不会中断 Future,而 Executors 提供了一个 FutureTask 。请参阅 ForkJoinTaskcancel 方法中的 Javadocs:mayInterruptIfRunning:此值在默认实现中无效,因为不使用中断来控制取消
    • 这会创建一个新线程每个调用。从性能的角度来看,这绝对是荒谬的。
    • @Ruben 是否有理由将工作拆分为缓存池和计划池?也就是说,你能不能去掉executorServicefuture,只处理提交给`schedulerExecutor的callable中的cf
    【解决方案2】:

    在 Java 9 中,会有completeOnTimeout(T value, long timeout, TimeUnit unit),它做你想做的事,虽然它不会取消慢供应商。

    还有一个orTimeout(long timeout, TimeUnit unit),在超时的情况下异常完成。

    【讨论】:

      【解决方案3】:

      DZone 有一篇很好的文章如何解决这个问题:https://dzone.com/articles/asynchronous-timeouts

      我不确定代码的版权,因此我不能在这里复制它。该解决方案与 Dane White 的解决方案非常相似,但它使用带有单个线程的线程池加上 schedule() 以避免浪费线程只是为了等待超时。

      它还会抛出 TimeoutException 而不是返回默认值。

      【讨论】:

      • 有一个问题。在 dzone 中给出的最后一个示例代码中,如果 asyncCode() 在 1s 内无法运行,则采用异常路径。我的问题是,如果这个asyncCode() 只是花费了太多时间来完成(比如说,一分钟)并且在那段时间里一大堆请求(10K)来并调用了这段代码,会发生什么?简而言之,每次收到请求时都会调用asyncCode(),但在 1 秒后它被主线程放弃,即使子线程正在处理它。那么对于 10K 请求,由于 10K 挂起的子线程,这不会耗尽可用内存吗?
      • @CyriacGeorge 可能不会。首先,在某个地方会有一个线程池,它根本不会运行数千个线程,而是配置的最大数量。另一件事是工作线程确实完成了——它们没有被杀死。所以线程池最终会用完线程,请求将不会被接受/添加到池中。当耗时太长的工作完成后,线程将返回到池中。
      • @CyriacGeorge 谨慎使用此设计;这不是万无一失的。如果您有可能永远卡住的任务,则池最终将耗尽。此外,应该有一些方法可以取消工人,告诉他们“我们不再关心”。
      • ScheduledExecutorService 单线程 - 只要底层 API 负载很重就足够了吗?
      【解决方案4】:

      我认为在提供默认值时,您总是需要额外的线程监控。我可能会走两个supplyAsync调用的路线,默认值包装在一个实用API中,由acceptEither链接。如果您更愿意包装您的供应商,那么您可以使用实用程序 API 为您进行“任一”调用:

      public class TimeoutDefault {
          public static <T> CompletableFuture<T> with(T t, int ms) {
              return CompletableFuture.supplyAsync(() -> {
                  try {
                      Thread.sleep(ms);
                  } catch (InterruptedException e) { }
                  return t;
              });
          }
      
          public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
              return () -> CompletableFuture.supplyAsync(supplier)
                  .applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
          }
      }
      
      CompletableFuture<Void> future = CompletableFuture
              .supplyAsync(Example::createFoo)
              .acceptEither(
                  TimeoutDefault.with("default", 1000),
                  Example::doStuffWithFoo);
      
      CompletableFuture<Void> future = CompletableFuture
              .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
              .thenAccept(Example::doStuffWithFoo);
      

      【讨论】:

      • 在线程上休眠并不是进行异步编程的好方法。您实际上是在白白浪费线程空间。
      【解决方案5】:

      没有标准库方法可用于构造 CompletableFuture,并在超时后提供值。也就是说,以最少的资源开销自行推出非常简单:

      private static final ScheduledExecutorService EXECUTOR
              = Executors.newSingleThreadScheduledExecutor();
      
      public static <T> CompletableFuture<T> delayedValue(final T value,
                                                          final Duration delay) {
          final CompletableFuture<T> result = new CompletableFuture<>();
          EXECUTOR.schedule(() -> result.complete(value),
                            delay.toMillis(), TimeUnit.MILLISECONDS);
          return result;
      }
      

      可以和CompleteableFuture的“either”方法一起使用:

      • accceptEither, acceptEitherAsync
      • applyToEither, applyToEitherAsync
      • runAfterEither, runAfterEitherAsync

      如果远程服务调用超过某个延迟阈值,则一个应用程序正在使用缓存值:

      interface RemoteServiceClient {
          CompletableFuture<Foo> getFoo();
      }
      
      final RemoteServiceClient client = /* ... */;
      final Foo cachedFoo = /* ... */;
      final Duration timeout = /* ... */;
      
      client.getFoos()
          .exceptionally(ignoredException -> cachedFoo)
          .acceptEither(delayedValue(cachedFoo, timeout),
              foo -> /* do something with foo */)
          .join();
      

      如果远程客户端调用异常完成(例如SocketTimeoutException),我们可以快速失败并立即使用缓存的值。

      CompletableFuture.anyOf(CompletableFuture&lt;?&gt;...) 可以与delayedValue 原语结合使用以包装具有上述语义的CompletableFuture

      @SuppressWarnings("unchecked")
      public static <T> CompletableFuture<T> withDefault(final CompletableFuture<T> cf,
                                                         final T defaultValue,
                                                         final Duration timeout) {
          return (CompletableFuture<T>) CompletableFuture.anyOf(
              cf.exceptionally(ignoredException -> defaultValue),
              delayedValue(defaultValue, timeout));
      }
      

      这很好地简化了上面的远程服务调用示例:

      withDefault(client.getFoos(), cachedFoo, timeout)
          .thenAccept(foo -> /* do something with foo */)
          .join();
      

      CompletableFutures 更准确地称为承诺,因为它们将Future 的创建与完成分离。确保使用专用线程池来处理繁重的 CPU 工作。要为昂贵的计算创建CompletableFuture,您应该使用CompletableFuture#supplyAsync(Supplier, Executor) 重载,因为#supplyAsync(Supplier) 重载默认为常见的ForkJoinPool。返回的CompletableFuture 无法取消其任务,因为Executor 接口未公开此功能。更一般地说,依赖CompletableFutures 不会取消他们的父母,例如cf.thenApply(f).cancel(true) 不会取消 cf。如果您需要该功能,我建议坚持使用 ExecutorServices 返回的 Futures。

      【讨论】:

      • Executors.newSingleThreadScheduledExecutor() 是否足以满足繁重的负载,或者我们需要根据预期负载进行调整?
      猜你喜欢
      • 2017-05-14
      • 1970-01-01
      • 1970-01-01
      • 2012-07-17
      • 2017-12-25
      • 1970-01-01
      • 2016-06-06
      • 2012-05-05
      • 2021-01-24
      相关资源
      最近更新 更多