【问题标题】:Try waiting for a CompletableFuture尝试等待 CompletableFuture
【发布时间】:2020-01-16 16:56:46
【问题描述】:

有没有办法尝试等待CompletableFuture 一段时间,然后返回不同的结果而不在超时后取消未来?

我有一个服务(我们称它为expensiveService),它跑去做自己的事。它返回一个结果:

enum Result {
    COMPLETED,
    PROCESSING,
    FAILED
}

我愿意 [阻止并] 等待它一小段时间(比如说 2 秒)。如果它没有完成,我想返回不同的结果,但我希望服务继续做自己的事情。然后询问服务是否完成(例如通过 websockets 或其他)将是客户的工作。

即我们有以下几种情况:

  • expensiveService.processAndGet() 需要 1 秒并完成它的未来。它返回COMPLETED
  • expensiveService.processAndGet() 1 秒后失败。它返回FAILED
  • expensiveService.processAndGet() 需要 5 秒并完成它的未来。它返回PROCESSING。如果我们向其他服务询问结果,我们会得到COMPLETED
  • expensiveService.processAndGet() 5 秒后失败。它返回PROCESSING。如果我们向其他服务询问结果,我们会得到FAILED

在这种特定情况下,我们实际上需要在超时时获取当前结果对象,从而导致以下额外的边缘情况。这会导致以下建议的解决方案出现一些问题:

  • expensiveService.processAndGet() 需要 2.01 秒并完成它的未来。它返回PROCESSINGCOMPLETED

我也在使用 Vavr,并愿意接受使用 Vavr 的Future 的建议。

我们创建了三种可能的解决方案,它们都有各自的优点和缺点:

#1 等待另一个未来

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            Thread.sleep(2000);
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

问题

  1. 总是调用第二个resultService
  2. 我们占用整个线程 2 秒。

#1a 等待另一个 Future 检查第一个 Future

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            int attempts = 0;
            int timeout = 20;
            while (!f.isDone() && attempts * timeout < 2000) {
                Thread.sleep(timeout);
                attempts++;
            }
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

问题

  1. 第二个resultService 仍然被调用。
  2. 我们需要将第一个 Future 传递给第二个,这不是很干净。

#2 Object.notify

Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
    process.whenComplete((r, e) -> {
        synchronized (monitor) {
            monitor.notifyAll();
        }
    });
    try {
        int attempts = 0;
        int timeout = 20;
        while (!process.isDone() && attempts * timeout < 2000) {
            monitor.wait(timeout);
            attempts++;
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
if (process.isDone()) {
    return process.toCompletableFuture();
} else {
    return CompletableFuture.completedFuture(resultService.get(processId));
}

问题

  1. 复杂的代码(可能存在错误,可读性差)。

#3 Vavr 的Future.await

return Future.of(() -> expensiveService.processAndGet()
        .await(2, TimeUnit.SECONDS)
        .recoverWith(e -> {
            if (e instanceof TimeoutException) {
                return Future.successful(resultService.get(processId));
            } else {
                return Future.failed(e);
            }
        })
        .toCompletableFuture();

问题

  1. 需要 Future in a Future 以避免 await 取消内部 Future。
  2. 将第一个 Future 移入第二个会破坏依赖于 ThreadLocals 的 [遗留] 代码。
  3. recoverWith 和捕捉 TimeoutException 并不是那么优雅。

#4 CompletableFuture.orTimeout

return expensiveService.processAndGet()
        .orTimeout(2, TimeUnit.SECONDS)
        .<CompletableFuture<Upload>>handle((u, e) -> {
            if (u != null) {
                return CompletableFuture.completedFuture(u);
            } else if (e instanceof TimeoutException) {
                return CompletableFuture.completedFuture(resultService.get(processId));
            } else {
                return CompletableFuture.failedFuture(e);
            }
        })
        .thenCompose(Function.identity());

问题

  1. 虽然在我的情况下,processAndGet 未来没有被取消,但根据文档,它应该是。
  2. 异常处理不好。

#5 CompletableFuture.completeOnTimeout

return expensiveService.processAndGet()
        .completeOnTimeout(null, 2, TimeUnit.SECONDS)
        .thenApply(u -> {
            if (u == null) {
                return resultService.get(processId);
            } else {
                return u;
            }
        });

问题

  1. 虽然在我的情况下,processAndGet 未来尚未完成,但根据文档,它应该是。
  2. 如果processAndGet 想将null 作为不同的状态返回怎么办?

所有这些解决方案都有缺点并且需要额外的代码,但这感觉就像是 CompletableFuture 或 Vavr 的开箱即用的 Future 应该支持的东西。有没有更好的方法来做到这一点?

【问题讨论】:

  • 我会为此使用CompletableFuture.completeOnTimeout;如expensiveService.completeOnTimeout(Result.TIMED_OUT, 2, TimeUnit.SECONDS).get()。但我不明白你的 2.01 要求;如果 2.01 秒是有效的持续时间,为什么不超过 2.01 秒或更多作为超时?
  • 谢谢。明天我会看看这是否符合我的要求。
  • 2.01 s 并不是一个有效的持续时间,只是它可能会进入一个“竞争条件”,即在 未来已经完成之后获取结果。例如。第一个服务实际上需要 2.01 秒,但在当前状态下等待和获取结果实际上需要 2.02 秒。它只是一个接近2s但大于它的任意数字。
  • 您如何确定orTimeout 没有按照记录完成未来?
  • @VGR CompletableFuture 通常不支持中断。正如文档所说“方法cancelcompleteExceptionally(new CancellationException())具有相同的效果

标签: java completable-future thread-sleep vavr


【解决方案1】:

首先值得指出的是,CompletableFuture 是如何工作的(或者为什么会这样命名):

CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);

基本上等价于

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            f.complete(supplier.get());
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});

CompletableFutureExecutor 正在执行的代码之间没有任何联系,事实上,我们可以有任意数量的持续完成尝试。特定代码旨在完成 CompletableFuture 实例的事实只有在调用完成方法之一时才会变得明显。

因此,CompletableFuture 不能以任何方式影响正在运行的操作,这包括取消时中断等。正如the documentation of CompletableFuture 所说:

方法cancelcompleteExceptionally(new CancellationException())效果一样

所以取消只是另一个完成尝试,如果它是第一个,它将获胜,但不会影响任何其他完成尝试。

所以orTimeout(long timeout, TimeUnit unit) 在这方面并没有太大的不同。超时后,它将执行相当于completeExceptionally(new TimeoutException()),如果没有其他完成尝试更快,它将获胜,这将影响依赖阶段,但不会影响其他正在进行的完成尝试,例如expensiveService.processAndGet() 在您的案例中发起了什么。

您可以像这样实现所需的操作

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
    () -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
    .whenComplete((u,t) -> alternative.cancel(false));

对于delayedExecutor,我们使用与orTimeoutcompleteOnTimeout 相同的功能。当future.whenComplete 中的取消更快时,它不会在指定时间之前评估指定的Supplier 或根本不评估。 applyToEither 将提供更快的可用结果。

这不会在超时时完成future,但正如前面所说,它的完成不会影响原始计算,所以这也可以:

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
    .execute(() -> {
        if(!future.isDone()) future.complete(resultService.get(processId));
    });
return future;

如前所述,这会在超时后完成未来,不会影响正在进行的计算,但会向调用者提供替代结果,但不会将resultService.get(processId) 抛出的异常传播到返回的未来。

【讨论】:

  • 谢谢。第一个解决方案看起来会做我想做的事。我选择了Object.await/notify,但这更优雅。
  • 但是,如果processAndGet()由几个阶段组成,或者需要一段时间才能开始,它仍然可以提前完成(例如在第二个解决方案中),从而不会开始或完成它工作。这就是我要避免的。
  • 我明白了。那么,如果processAndGet() 本身在返回前的最后阶段调用copy() 可能是一个不错的举措,此时它需要副作用。
  • 短语“希望计算继续”意味着后台计算将结果存储在其他地方,而不是仅存储在废弃的未来,否则“从其他地方获取的结果”将不会可能的。所以如果有这样的副作用,processAndGet() 方法应该关心这个计算状态。即使它被证明是不需要的,它也没有害处。它还可以使用.minimalCompletionStage() 作为其返回值,以表示调用者不应该操纵完成状态。
  • 回复CancellationException,我明白我犯的错误了。我在答案中修复了它。不需要isDone 检查,因为尝试完成已经完成的未来无论如何都没有效果。但是需要使取消依赖于 任一 阶段,以确保它在取消 alternative 阶段之前已完成,这对 任一阶段都不会产生任何影响 阶段。
猜你喜欢
  • 2021-07-01
  • 2018-11-08
  • 2020-08-12
  • 2023-01-23
  • 2021-06-02
  • 2016-02-28
  • 1970-01-01
  • 1970-01-01
  • 2021-06-10
相关资源
最近更新 更多