【问题标题】:WorkStealingPool and ThreadPoolExecutor yields different results when used with CompletableFutureWorkStealingPool 和 ThreadPoolExecutor 与 CompletableFuture 一起使用时会产生不同的结果
【发布时间】:2018-08-25 17:57:17
【问题描述】:

执行以下代码时:

ExecutorService executorService = Executors.newWorkStealingPool(20);

Function<String, CompletableFuture<String>> requestTask =
        url -> CompletableFuture.supplyAsync(() -> {
                    System.out.println("Request " + requestCount++ + " was sent");
                    HttpClient.get(url);
                    return url;
                    }, executorService);

Function<String, String> extractName = s -> s.replaceAll("(https|http|://|\\.com|www\\.|\\.io)", "");

CompletableFuture[] futures = urls.stream() // urls list contains 14 urls
        .map(requestTask)
        .map(future -> future.thenApply(extractName))
        .map(future -> future.thenAccept(System.out::println))
        .toArray(CompletableFuture[]::new);

CompletableFuture.allOf(futures);
executorService.shutdown();

结果如下:

Request 0 was sent
Request 1 was sent
Request 2 was sent
Request 3 was sent
Request 4 was sent
Request 5 was sent
Process finished with exit code 0

但是,当Executors.newWorkStealingPool(20) 替换为Executors.newFixedThreadPool(20) 时,所有请求都会被发送。这种行为的原因是什么?

【问题讨论】:

    标签: java concurrency executorservice completable-future


    【解决方案1】:

    并非所有请求都已发送,因为(剧透警告!)JVM 只是简单地终止。

    如您所知,the conditions for JVM termination 是:

    发生以下任一情况:

    • Runtime 类的 exit 方法已被调用,安全管理器已允许执行退出操作。
    • 不是守护线程的所有线程都已死亡,要么从调用 run 方法返回,要么抛出传播到 run 方法之外的异常。

    显然不是第一种,所以肯定是第二种。

    首先要注意的是让你的main()方法退出:

    • 您调用CompletableFuture.allOf(),但您不对结果执行任何操作,因此它不会阻塞(没有join() 调用);
    • 调用ExecutorService.shutDown() 只会告诉执行器关闭,它不会等待。

    最初,主线程是唯一的非守护线程,所以这应该足以让 JVM 退出。但这就是 2 个执行者有所作为的地方:

    • newFixedThreadPool()ThreadPoolExecutor 实现,它使用Executors.defaultThreadFactory(),创建非守护线程;
    • newWorkStealingPool()ForkJoinPool 实现,它在它创建的所有线程上调用 setDaemon(true)¹。

    不幸的是,它没有记录在案,但基本上,这归结为Why does the following application terminate immediately when using ForkJoinPool, but not when I use ThreadPoolExecutor?

    因此,您的问题有两种可能的解决方案:

    • allOf() 之后致电join()
    • shutdown()之后在执行器上调用awaitTermination()

    ¹As noted by teppic in the comments,这在 Java 8 中没有记录,但 it is now since Java 9

    【讨论】:

    • “你调用 CompletableFuture.allOf() 但你没有对结果做任何事情,所以它不会阻塞(没有 join() 调用)”足以回答。谢谢=)
    • 基本上我也是这么想的,但是你把它写到最后。很好的解释!
    • 虽然ForkJoinPool 中线程的守护进程性质未在 JDK 8 中记录,但它在 JDK 9+ 中 ForkJoinPool 的类 javadoc 中。
    猜你喜欢
    • 1970-01-01
    • 2010-09-11
    • 1970-01-01
    • 1970-01-01
    • 2015-03-08
    • 1970-01-01
    • 1970-01-01
    • 2023-04-06
    • 1970-01-01
    相关资源
    最近更新 更多