【问题标题】:Can one thread block complete ForkJoinPool一个线程可以阻塞完成 ForkJoinPool
【发布时间】:2020-11-10 09:01:34
【问题描述】:

我正在阅读https://dzone.com/articles/think-twice-using-java-8

介于两者之间的某个地方表明

问题是所有并行流都使用公共的fork-join线程池,如果你提交一个长时间运行的任务,你实际上阻塞了池中的所有线程。

我的问题是 - 池中的其他线程不应该在不等待长时间运行的任务的情况下完成吗?还是说我们是否并行创建两个并行流?

【问题讨论】:

  • 我认为作者是说长时间运行的任务可能正在使用池中的所有线程,因此其他任务无法运行。任务不是单个可运行的,它是一个并行操作。
  • @NathanHughes 但这些线程不会来自常见的 ForkJoinPool。即使是,为什么其他线程会被阻塞在同一个池中?或者也许作者说线程将在终端操作时等待。这令人困惑!
  • ForkJoinPool 用于小型任务,不适用于长时间运行的任务。如果你想创建一个长时间运行的任务(考虑到它不会消耗很多 cpu),那么在一个单独的 Executor 中运行 .parallel()。所以是的,上述陈述是绝对正确的。

标签: multithreading java-8 java-stream forkjoinpool


【解决方案1】:

流操作不会阻塞池中的线程,它会利用它们。根据工作负载拆分,可能所有线程都忙于处理首先开始的 Stream 操作,因此它们无法为另一个 Stream 操作拾取工作负载。在这种情况下,这篇文章似乎错误地使用了“阻止”一词。

值得注意的是,Stream API 和默认实现是为不等待外部事件(阻塞线程)的 CPU 密集型任务设计的。如果您以这种方式使用它,那么哪个任务使线程忙于整体吞吐量并不重要。但是,如果您同时处理不同的请求,并且希望在工作线程分配中具有某种公平性,那就行不通了。

如果您继续阅读本文,您会发现他们创建了一个示例,假设错误使用 Stream API,具有真正的阻塞操作,甚至调用第一个示例broken,尽管他们将它不必要地用引号引起来。在这种情况下,错误不是使用并行 Stream,而是将其用于阻塞操作。

这样的并行流操作可以“阻塞所有其他使用并行流的任务”也是不正确的。要进行另一个并行 Stream 操作,您必须至少有一个可运行的线程来启动 Stream 操作。由于该启动线程将有助于 Stream 处理,因此始终至少有一个参与线程。因此,如果公共池的所有线程都在一个 Stream 操作上工作,它可能会降低其他并行 Stream 操作的性能,但不会导致它们停止。

例如,如果您使用以下测试程序

long t0 = System.nanoTime();
new Thread(() -> {
    Stream.generate(() -> {
        long missing = TimeUnit.SECONDS.toNanos(3) + t0 - System.nanoTime();
        if(missing > 0) {
            System.out.println("blocking "+Thread.currentThread().getName());
            LockSupport.parkNanos(missing);
        }
        return "result";
    }).parallel().limit(100).forEach(result -> {});
    System.out.println("first (blocking) operation finished");
}).start();
for(int i = 0; i< 4; i++) {
    new Thread(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println(Thread.currentThread().getName()
                          +" starting another parallel Stream");
        Object[] threads =
            Stream.generate(() -> Thread.currentThread().getName())
                .parallel().limit(100).distinct().toArray();
        System.out.println("finished using "+Arrays.toString(threads));
    }).start();
}

它可能会打印出类似的东西

blocking ForkJoinPool.commonPool-worker-5
blocking ForkJoinPool.commonPool-worker-13
blocking Thread-0
blocking ForkJoinPool.commonPool-worker-7
blocking ForkJoinPool.commonPool-worker-15
blocking ForkJoinPool.commonPool-worker-11
blocking ForkJoinPool.commonPool-worker-9
blocking ForkJoinPool.commonPool-worker-3
Thread-2 starting another parallel Stream
Thread-4 starting another parallel Stream
Thread-1 starting another parallel Stream
Thread-3 starting another parallel Stream
finished using [Thread-4]
finished using [Thread-2]
finished using [Thread-3]
finished using [Thread-1]
first (blocking) operation finished

(细节可能有所不同)

但是,创建启动线程(例如,那些接受外部请求的线程)的线程管理与公共池之间可能存在冲突。但是,如前所述,如果您希望在多个独立操作之间保持公平,那么并行 Stream 操作不是正确的工具。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多