【发布时间】:2020-01-16 16:56:46
【问题描述】:
有没有办法尝试等待CompletableFuture 一段时间,然后返回不同的结果而不在超时后取消未来?
我有一个服务(我们称它为expensiveService),它跑去做自己的事。它返回一个结果:
enum Result {
COMPLETED,
PROCESSING,
FAILED
}
我愿意 [阻止并] 等待它一小段时间(比如说 2 秒)。如果它没有完成,我想返回不同的结果,但我希望服务继续做自己的事情。然后询问服务是否完成(例如通过 websockets 或其他)将是客户的工作。
即我们有以下几种情况:
-
expensiveService.processAndGet()需要 1 秒并完成它的未来。它返回COMPLETED。 -
expensiveService.processAndGet()1 秒后失败。它返回FAILED。 -
expensiveService.processAndGet()需要 5 秒并完成它的未来。它返回PROCESSING。如果我们向其他服务询问结果,我们会得到COMPLETED。 -
expensiveService.processAndGet()5 秒后失败。它返回PROCESSING。如果我们向其他服务询问结果,我们会得到FAILED。
在这种特定情况下,我们实际上需要在超时时获取当前结果对象,从而导致以下额外的边缘情况。这会导致以下建议的解决方案出现一些问题:
-
expensiveService.processAndGet()需要 2.01 秒并完成它的未来。它返回PROCESSING或COMPLETED。
我也在使用 Vavr,并愿意接受使用 Vavr 的Future 的建议。
我们创建了三种可能的解决方案,它们都有各自的优点和缺点:
#1 等待另一个未来
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
Thread.sleep(2000);
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 总是调用第二个
resultService。 - 我们占用整个线程 2 秒。
#1a 等待另一个 Future 检查第一个 Future
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
int attempts = 0;
int timeout = 20;
while (!f.isDone() && attempts * timeout < 2000) {
Thread.sleep(timeout);
attempts++;
}
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 第二个
resultService仍然被调用。 - 我们需要将第一个 Future 传递给第二个,这不是很干净。
#2 Object.notify
Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
process.whenComplete((r, e) -> {
synchronized (monitor) {
monitor.notifyAll();
}
});
try {
int attempts = 0;
int timeout = 20;
while (!process.isDone() && attempts * timeout < 2000) {
monitor.wait(timeout);
attempts++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (process.isDone()) {
return process.toCompletableFuture();
} else {
return CompletableFuture.completedFuture(resultService.get(processId));
}
问题
- 复杂的代码(可能存在错误,可读性差)。
#3 Vavr 的Future.await
return Future.of(() -> expensiveService.processAndGet()
.await(2, TimeUnit.SECONDS)
.recoverWith(e -> {
if (e instanceof TimeoutException) {
return Future.successful(resultService.get(processId));
} else {
return Future.failed(e);
}
})
.toCompletableFuture();
问题
- 需要 Future in a Future 以避免
await取消内部 Future。 - 将第一个 Future 移入第二个会破坏依赖于
ThreadLocals 的 [遗留] 代码。 -
recoverWith和捕捉TimeoutException并不是那么优雅。
#4 CompletableFuture.orTimeout
return expensiveService.processAndGet()
.orTimeout(2, TimeUnit.SECONDS)
.<CompletableFuture<Upload>>handle((u, e) -> {
if (u != null) {
return CompletableFuture.completedFuture(u);
} else if (e instanceof TimeoutException) {
return CompletableFuture.completedFuture(resultService.get(processId));
} else {
return CompletableFuture.failedFuture(e);
}
})
.thenCompose(Function.identity());
问题
- 虽然在我的情况下,
processAndGet未来没有被取消,但根据文档,它应该是。 - 异常处理不好。
#5 CompletableFuture.completeOnTimeout
return expensiveService.processAndGet()
.completeOnTimeout(null, 2, TimeUnit.SECONDS)
.thenApply(u -> {
if (u == null) {
return resultService.get(processId);
} else {
return u;
}
});
问题
- 虽然在我的情况下,
processAndGet未来尚未完成,但根据文档,它应该是。 - 如果
processAndGet想将null作为不同的状态返回怎么办?
所有这些解决方案都有缺点并且需要额外的代码,但这感觉就像是 CompletableFuture 或 Vavr 的开箱即用的 Future 应该支持的东西。有没有更好的方法来做到这一点?
【问题讨论】:
-
我会为此使用CompletableFuture.completeOnTimeout;如
expensiveService.completeOnTimeout(Result.TIMED_OUT, 2, TimeUnit.SECONDS).get()。但我不明白你的 2.01 要求;如果 2.01 秒是有效的持续时间,为什么不超过 2.01 秒或更多作为超时? -
谢谢。明天我会看看这是否符合我的要求。
-
2.01 s 并不是一个有效的持续时间,只是它可能会进入一个“竞争条件”,即在 未来已经完成之后获取结果。例如。第一个服务实际上需要 2.01 秒,但在当前状态下等待和获取结果实际上需要 2.02 秒。它只是一个接近2s但大于它的任意数字。
-
您如何确定
orTimeout没有按照记录完成未来? -
@VGR
CompletableFuture通常不支持中断。正如文档所说“方法cancel与completeExceptionally(new CancellationException())具有相同的效果”
标签: java completable-future thread-sleep vavr