【问题标题】:Return a CompletableFuture without exposing executor thread返回一个 CompletableFuture 而不暴露 executor 线程
【发布时间】:2019-10-24 06:42:28
【问题描述】:

我在库中公开了一个方法,该方法返回一个 CompletableFuture。该方法的计算发生在单线程执行器上,这是我的瓶颈,因此我不希望任何后续工作发生在同一个线程上。

如果我使用返回“supplyAsync”结果的简单方法,我会将我宝贵的线程暴露给调用者,他们可能正在添加同步操作(例如通过 thenAccept),这可能会在该线程上占用一些 CPU 时间。

复制如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CfPlayground {
    private ExecutorService preciousExecService = Executors.newFixedThreadPool(1);

    CfPlayground() {}

    private static void log(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "] " + msg);
    }

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService);
    }

    void syncOp(String salutation) {
        log("In syncOp: " + salutation);
    }

    void run() {
        log("run");
        asyncOp("world").thenAccept(this::syncOp);
    }

    public static void main(String[] args) throws InterruptedException {
        CfPlayground compFuture = new CfPlayground();
        compFuture.run();
        Thread.sleep(500);
        compFuture.preciousExecService.shutdown();
    }
}

这确实打印:

[main] run
[pool-1-thread-1] In asyncOp
[pool-1-thread-1] In syncOp: Hello world

我发现的一个解决方案是引入另一个 Executor,并在返回 CompletableFuture 之前添加一个 no-op thenApplyAsync 与该 executor

    CompletableFuture<String> asyncOp(String param) {
        return CompletableFuture.supplyAsync(() -> {
            log("In asyncOp");
            return "Hello " + param;
        }, preciousExecService).thenApplyAsync(s -> s, secondExecService);
    }

这可行,但感觉不是特别优雅 - 有没有更好的方法来做到这一点?

【问题讨论】:

  • 您的解决方案似乎是最好的解决方案。您可以传递ForkJoinPool.commonPool(),而不是创建第二个 ExecutorService,如果没有给出明确的 Executor,这是 CompletableFutures 默认使用的。 (我不完全清楚没有显式 Executor 参数的方法是否总是使用 commonPool,或者总是使用当前 CompletableFuture 的 Executor。)
  • 感谢@VGR 的建议,它可以代替第二个执行者工作。为了完整起见,我的理解是,没有显式执行程序的异步回调将在 ForkJoinPool.commonPool() 上运行,如果 CompletableFuture 立即完成,非异步回调将在调用线程上运行,或者在完成请求的线程上运行(例如我的单线程执行器)否则。有更准确完整的解释here

标签: java concurrency completable-future


【解决方案1】:

没有功能可以将您的完成与相关操作的执行分开。当链接依赖action的线程已经完成注册并且你的executor线程完成future时,如果没有给出其他executor,那么其他线程应该执行依赖action?

您将另一个动作与不同的执行者链接的方法似乎是您能得到的最好的方法。但是,重要的是要注意,在异常完成的情况下,异常会在不评估传递给 thenApply 的函数的情况下传播。如果调用者链接了 whenCompletehandleexceptionally 之类的操作,则此异常传播可能再次导致线程暴露。

另一方面,您不需要指定辅助执行器,因为您可以使用不带 executor 参数的 async 方法来获取默认(通用 Fork/Join)池。

所以链接.whenCompleteAsync((x,y) -&gt; {}) 是迄今为止解决问题的最佳方法。

【讨论】:

  • 感谢@Holger 的回复,以及关于异常传播影响的要点!
【解决方案2】:

您可以更改方法签名以返回 Future 而不是 CompletableFuture

Future<String> asyncOp(String param) {
    return CompletableFuture.supplyAsync(() -> {
        log("In asyncOp");
        return "Hello " + param;
    }, preciousExecService);
}

那样,run() 方法会抛出编译错误:

void run() {
    log("run");
    asyncOp("world").thenAccept(this::syncOp);
}

调用者仍然可以将返回的 Future 转换回 CompletableFuture,但这会严重滥用您的 API,而且不会偶然发生。

【讨论】:

  • 感谢@Dorian 的回复。同意返回 Future 将防止调用者“窃取”内部执行程序,但我正在尝试使用 CompletableFuture 让调用者使用 Java-8 样式的回调,而不是必须等待或阻塞 Future。说得通?如果我在这里遗漏了什么,请告诉我。
  • 嗯,从您的帖子中我不明白您想这样做。在这种情况下,我的解决方案将不起作用。
猜你喜欢
  • 2017-10-28
  • 2015-08-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-12
相关资源
最近更新 更多