【问题标题】:Canceling a CompletableFuture chain取消 CompletableFuture 链
【发布时间】:2014-08-21 03:42:02
【问题描述】:

我有一个异步服务调用链,我想取消它。好吧,实际上,我有两个并行的服务调用链,如果一个成功,我想取消另一个。

对于番石榴的期货,我习惯于通过取消最后一个期货来取消整个期货链。看来我不能用 java-8 的期货来做到这一点。 除非有人知道如何做。

你的任务,如果你选择接受它,是告诉我是否可以保持我漂亮的语法并取消链。否则,我将编写自己的链接未来包装器 - 特别是在 this question 之后。


接下来是我自己的测试和尝试。

@Test
public void shouldCancelOtherFutures() {
    // guava
    ListenableFuture<String> as = Futures.immediateFuture("a");
    ListenableFuture<String> bs = Futures.transform(as, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> cs = Futures.transform(bs, (AsyncFunction<String, String>) x -> SettableFuture.create());
    ListenableFuture<String> ds = Futures.transform(cs, Functions.<String>identity());

    ds.cancel(false);
    assertTrue(cs.isDone()); // succeeds

    // jdk 8
    CompletableFuture<String> ac = CompletableFuture.completedFuture("a");
    CompletableFuture<String> bc = ac.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> cc = bc.thenCompose(x -> new CompletableFuture<>());
    CompletableFuture<String> dc = cc.thenApply(Function.identity());

    dc.cancel(false);
    assertTrue(cc.isDone()); // fails
}

(假设每个thenCompose()Futures.transform(x, AsyncFunction) 代表一个异步服务调用。)

我明白为什么 Doug Lee 的研究生大军会这样做。有了分支链,是不是应该全部取消?

CompletableFuture<Z> top = new CompletableFuture<>()
    .thenApply(x -> y(x))
    .thenCompose(y -> z(y));

CompletableFuture<?> aBranch = top.thenCompose(z -> aa(z));
CompletableFuture<?> bBranch = top.thenCompose(z -> bb(z));

...
bBranch.cancel(false);
// should aBranch be canceled now?

我可以使用自定义包装函数解决这个问题,但它会弄乱漂亮的语法。

private <T,U> CompletableFuture<U> transformAsync(CompletableFuture<T> source, Function<? super T,? extends CompletableFuture<U>> transform) {
    CompletableFuture<U> next = source.thenCompose(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

private <T,U> CompletableFuture<U> transform(CompletableFuture<T> source, Function<T,U> transform) {
    CompletableFuture<U> next = source.thenApply(transform);
    next.whenComplete((x, err) -> next.cancel(false));
    return next;
}

// nice syntax I wished worked
CompletableFuture<?> f1 = serviceCall()
        .thenApply(w -> x(w))
        .thenCompose(x -> serviceCall())
        .thenCompose(y -> serviceCall())
        .thenApply(z -> $(z));

// what works, with less readable syntax
CompletableFuture<?> f2 =
        transform(
            transformAsync(
                transformAsync(
                    transform(serviceCall, x(w)),
                    x -> serviceCall()),
                y -> serviceCall()),
            z -> $(z));

【问题讨论】:

    标签: java java-8


    【解决方案1】:

    这取决于你的目标是什么。我认为,让您的中间 CompletableFutures 报告完成状态并不重要,因为您在使用链式构造调用时通常不会注意到。重要的一点是你希望你昂贵的serviceCall()不要被触发。

    一种解决方案可以是:

    CompletableFuture<String> flag=new CompletableFuture<>();
    
    CompletableFuture<String> ac = serviceCall()
      .thenCompose(x -> flag.isCancelled()? flag: serviceCall())
      .thenCompose(x -> flag.isCancelled()? flag: serviceCall());
    ac.whenComplete((v,t)->flag.cancel(false));// don’t chain this call
    

    这使用whenComplete 调用,就像在您的解决方案中一样,但仅在最终的CompletableFuture 上将取消传播到专用的flag 对象。在调用 cancel 之后,下一个 thenCompose 调用将检测到取消并返回取消的未来,因此取消将传播链然后不再调用 compose 或 apply 方法。

    缺点是这不能与thenApply 结合,因为Function 不能返回取消的未来。因此,当异步服务调用完成并与Function 链接时,即使发生取消也会应用该函数。

    解决此问题的替代解决方案是为您的serviceCall 创建一个包装函数,其中包括启动前和完成后的测试:

    CompletableFuture<String> serviceCall(CompletableFuture<String> f) {
        if(f.isCancelled()) return f;
        CompletableFuture<String> serviceCall=serviceCall();
        return serviceCall.thenCompose(x->f.isCancelled()? f: serviceCall);
    }
    

    那么您的用例将如下所示:

    CompletableFuture<String> flag=new CompletableFuture<>();
    
    CompletableFuture<String> ac = serviceCall(flag)
      .thenApply(w->x(w))
      .thenCompose(x -> serviceCall(flag))
      .thenCompose(x -> serviceCall(flag))
      .thenApply(z -> $(z));
    ac.whenComplete((v,t)->flag.cancel(false));
    

    当然,您必须将 &lt;String&gt; 替换为原始 serviceCall() 用于 CompletableFuture&lt;T&gt; 的任何类型参数。

    【讨论】:

    • 对这些方法的警告:ac.isCancelled() 不会是真的,因为某处 CancellationException 被包裹在 CompletionException 中
    • @Michael Deardeuff:它在我的测试设置中有效,我从未处理过CancellationException,所以我没有这样的问题。可能是在调用get 时,但在你的问题中链接时,你永远不会调用get
    • 可能会出现混淆,因为我没有明确说明您应该像在您的问题中那样使用它:构造的 ac 是您应该关心的唯一 CompletableFuture,它是您使用的那个用于检索完成状态和结果以及您在适当时取消的状态;助手flag 仅在构造期间相关。因此,当您想cancel 时,您应该在ac 上调用取消,这当然会立即反映出它已被取消。 (除非它在cancel 调用之前已经完成。
    • 哦,这不会阻止我使用它。需要注意的是,后来者不会在单元测试中尝试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-31
    • 1970-01-01
    • 1970-01-01
    • 2020-06-07
    • 1970-01-01
    相关资源
    最近更新 更多