【问题标题】:Cancellation of CompletableFuture controlled by ExecutorServiceExecutorService 控制的 CompletableFuture 的取消
【发布时间】:2016-04-19 19:34:24
【问题描述】:

我有一个ExecutorService,它将计算的数据转发到CompletableFuture

class DataRetriever {
    private final ExecutorService service = ...;

    public CompletableFuture<Data> retrieve() {
        final CompletableFuture<Data> future = new CompletableFuture<>();

        service.execute(() -> {
            final Data data = ... fetch data ...
            future.complete(data);
        });
        return future;
    }
}

我希望客户/用户能够取消任务:

final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();

future.cancel(true);

这不起作用,因为这会取消外部CompletableFuture,但不会取消执行器服务中计划的内部未来。

是否有可能将外部未来的cancel() 传播到内部未来?

【问题讨论】:

  • 你为什么首先使用CompletableFuture?为什么不直接执行Callable&lt;Data&gt; 并从retrieve() 返回结果Future

标签: java concurrency java.util.concurrent


【解决方案1】:

CompletableFuture#cancel 实际上仅用于将CompletableFuture 标记为已取消。它不会通知正在执行的任务停止,因为它与任何此类任务都没有关系。

javadoc 暗示了这一点

mayInterruptIfRunning - 此值在此实现中无效,因为不使用中断来控制处理。

您示例中的 inner 未来是Future,而不是CompletableFuture,它确实与正在执行的Runnable(或Callable)有关。在内部,由于它知道任务正在执行的Thread,它可以向它发送interrupt 以尝试停止它。

一种选择是返回某种类型的元组(例如一些 POJO),它提供对您的CompletableFutureExecutorService#submit 返回的Future 的引用。如果需要,您可以使用Futurecancel。你必须记住cancelcomplete 你的CompletableFuture,这样你的代码的其他部分就不会永远被阻塞/饿死。

【讨论】:

  • 返回元组/pojo(包含外部和内部未来)的问题是客户端意识到内部实现。从他的角度来看,他应该不在乎。如果我们能以某种方式将外部和内部未来链接起来,那将会很有用。
  • 取消ExecutorService#submit返回的Future似乎值得一试。
  • @DaveTeezo 还有更复杂的解决方案,比如扩展CompletableFuture 以包装相关的Future,但我发现实现相当难以维护和不清楚。
【解决方案2】:

Pillar 提到的另一个解决方案是扩展 CompletableFuture。这是一种与您现有代码非常相似的方法。它还处理异常,这是一个很好的奖励。

class CancelableFuture<T> extends CompletableFuture<T> {
    private Future<?> inner;

    /**
     * Creates a new CancelableFuture which will be completed by calling the
     * given {@link Callable} via the provided {@link ExecutorService}.
     */
    public CancelableFuture(Callable<T> task, ExecutorService executor) {
        this.inner = executor.submit(() -> complete(task));
    }

    /**
     * Completes this future by executing a {@link Callable}. If the call throws
     * an exception, the future will complete with that exception. Otherwise,
     * the future will complete with the value returned from the callable.
     */
    private void complete(Callable<T> callable) {
        try {
            T result = callable.call();
            complete(result);
        } catch (Exception e) {
            completeExceptionally(e);
        }
    }

    @Override
    public boolean cancel(boolean mayInterrupt) {
        return inner.cancel(mayInterrupt) && super.cancel(true);
    }
}

然后,在DataRetriever,您可以简单地这样做:

public CompletableFuture<Data> retrieve() {
    return new CancelableFuture<>(() -> {... fetch data ...}, service);
}

【讨论】:

    【解决方案3】:

    使用我的Tascalate Concurrent 库,您的代码可以重写如下:

    class DataRetriever {
      private final ExecutorService service = ...;
    
      public Promise<Data> retrieve() {
        return CompletableTask.supplyAsync(() -> {
          final Data data = ... fetch data ...
          return data;
        }, service);
      }
    }
    

    PromiseCompletableTask 是我图书馆的课程,您可以在my blog 阅读更多内容

    【讨论】:

      【解决方案4】:

      通过向外部未来添加异常处理程序,您可以将对cancel 的调用传递给内部未来。这是有效的,因为CompletableFuture.cancel 导致未来以CancellationException 异常完成。

      private final ExecutorService service = ...;
      
      public CompletableFuture<Data> retrieve() {
          final CompletableFuture<Data> outer = new CompletableFuture<>();
      
          final Future<?> inner = service.submit(() -> {
              ...
              future.complete(data);
          });
      
          outer.exceptionally((error) -> {
              if (error instanceof CancellationException) {
                  inner.cancel(true);
              }
              return null; // must return something, because 'exceptionally' expects a Function
          });
      
          return outer;
      }
      

      outer.exceptionally 的调用会创建一个新的CompletableFuture,因此不会影响outer 本身的取消或异常状态。您仍然可以将您喜欢的任何其他CompletionStage 附加到outer,包括另一个exceptionally 阶段,它将按预期运行。

      【讨论】:

        猜你喜欢
        • 2019-02-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-08-01
        • 1970-01-01
        • 1970-01-01
        • 2021-09-20
        相关资源
        最近更新 更多