【问题标题】:Make Thread usable if it waits for other stuff使线程在等待其他东西时可用
【发布时间】:2021-10-28 05:46:09
【问题描述】:

我有 10 个实现此方法的对象:

public CompletableFuture<TestObject> processAsync(Executor executor){
    return CompletableFuture.supplyAsync(
        () -> DoLongSynchronousRestCall(), //Takes 3 Seconds
        executor
    );
}

我已经实现了一些代码,它遍历所有 10 个对象并检查 processAsync 方法的 completableFuture 是否为 isDone(),然后提取结果。这完全正常,符合预期。

现在是“棘手”的事情。我的 Threadpool 包含 5 个线程,这意味着我可以同时处理 5 个对象,这导致一切都需要 6 秒。

但我想要实现的是5个线程都执行DoLongSynchronousRestCall(),然后他们看到他们需要等待,并直接寻找更多的工作。这意味着一切都应该在 3.4 秒内完成(如果我们假设所有这些东西的开销大约是 0.4 秒)。

这样的事情可能吗?

例如将 Threadexecution 标记为“请在 1 秒后再次查看,同时您可以做更多工作”?

【问题讨论】:

  • 那为什么不做10个线程呢?如果您有 10 个需要并行进行的同步调用,那么它们需要在 10 个线程中完成。这就是同步的意思。
  • "然后他们看到他们需要等待,并直接寻找更多的工作。"这将如何发生?你真的是在等待吗?
  • @matt no iam 目前没有做任何事情,这是我的问题的一部分,如果你们知道这样的事情会是什么样子。
  • 您可以拥有更多线程,尤其是在线程不受 CPU 限制的情况下。当它们出于某种原因等待时,例如调用等待,那么操作系统就有机会安排其他线程运行。
  • 为什么不使用 ExecutorService 来完成这一切呢? Executors 类将方便地为您提供所需类型的服务。也许 Executots.newCachedThreadPool() 是你想要的。

标签: java threadpool


【解决方案1】:

听起来您想在后台线程上运行一堆任务并等待它们全部完成,然后收集它们返回的结果。

执行者

在 Java 5 中添加了 Executors 框架以简化此类工作。

将您的任务定义为RunnableCallable。在您的情况下,Callable 因为您希望返回一个值。

在我们的任务中,我们通过休眠当前线程三秒钟来模拟需要很长时间的工作。为了模拟要返回的结果,我们将当前时刻捕获为 Instant 对象。

Callable < Instant > task = () -> {
    Thread.sleep( Duration.ofSeconds( 3 ).toMillis() );  // Simulating work that takes a long while. 
    return Instant.now();  // Simulating result of work to be returned.
};

收集一堆要在后台线程上执行的任务。在这里,我们多次分配相同的任务对象。您也可以实例化多个任务对象。

List < Callable < Instant > > tasks = new ArrayList <>();  // Collect tasks to be executed. 
int limit = 5;
for ( int i = 0 ; i < limit ; i++ ) { tasks.add( task ); }

通过Executors 实用程序类实例化ExecutorService

您有多种执行器服务实现可供选择,具有多种行为。如果您知道一次要运行少量任务,则可以选择一个缓存线程池,它一次会生成一堆线程。对于大量任务,请选择另一个 ExecutorService 以避免压倒您的机器。

ExecutorService executorService = Executors.newCachedThreadPool();

仅供参考……在未来,Project Loom 可能会将 虚拟线程 (fibers) 引入 Java 以启用数百万个并发线程(如果不是 CPU 密集型的话)。现在可以使用基于早期访问 Java 18 的实验版本。您可以使用 another executor service

ExecutorService executorService = Executors.newVirtualThreadExecutor();

如果您有大量任务,并且尚未使用 Project Loom 技术,您可能需要选择executor service backed by a limited number of threads

ExecutorService executorService = Executors.newFixedThreadPool( 3 );

提交我们要执行的任务集合。提交的每个任务的结果是一个Future。我们将所有预期结果作为Future object.s 的集合来跟踪

List < Future < Instant > > futures = null;
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }

开始关闭我们的执行器服务。这会阻止提交更多任务。

executorService.shutdown();

等待所有提交的任务完成。如果任务花费的时间比预期的长,请指定超时以引发异常。

try { executorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }

报告结果。循环每个 Future 对象。查看任务是否被取消。如果没有取消,那么我们知道它一定已经完成,因为我们已经等待 executor 服务关闭了。

for ( Future < Instant > future : futures )
{
    if ( future.isCancelled() )
    {
        System.out.println( "Canceled." );
    } else
    {
        try { System.out.println( future.get() ); } catch ( InterruptedException e ) { e.printStackTrace(); } catch ( ExecutionException e ) { e.printStackTrace(); }
    }
}

运行时,有一对System.out.println 调用围绕我们的执行器服务的执行。您可以看到所有任务都在21 秒后提交,它们都休眠了 3 秒,然后全部在24 秒唤醒并执行。

本次运行在 MacBook Pro(13 英寸,Apple Silicon M1,2020 年)上运行,配备 8 个(4 个性能和 4 个效率)内核,使用早期访问 Java 17。

INFO - Starting execution at 2021-08-28T21:33:21.514429Z
INFO - Ending execution at 2021-08-28T21:33:24.525798Z
2021-08-28T21:33:24.524563Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522177Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522181Z

线程调度

Your comment 是正确的。 Java 中的阻塞线程不会阻塞 CPU 内核的使用。

Java 的当前实现(至少通过 Java 17)使用主机操作系统线程作为 Java 线程。这意味着哪个线程在哪个 CPU 内核上运行以及运行多长时间的调度由主机操作系统控制。

目前,如果您的 Java 代码阻塞,Java 线程阻塞,因此主机操作系统线程阻塞。线程是否被阻塞,是否继续在 CPU 内核上执行取决于主机操作系统。

请注意,在主机操作系统线程之间切换以在 CPU 内核上执行是相对“昂贵的”,会消耗 CPU 周期的开销,并且可能分配过多的内存。这就是为什么你不应该用太多的线程让你的机器负担过重。与内核大致相同数量的线程是一般准则,但它会根据您正在执行的任务的性质而有所不同。

Project Loom 中“更便宜”的线程

如上所述,Project Loom 中的虚拟线程承诺让 Java 线程之间的阻塞 Java 代码“更便宜”,这意味着更少的内存和 CPU 开销。

Loom 技术中的阻塞虚拟线程将更快、更轻松地切换 CPU 内核以在另一个虚拟线程上工作。这些虚拟线程映射到“真实”主机平台操作系统线程,多对一。这些效率意味着即使是数百万个并发线程在传统硬件上也可能是合理的。

我在这里故意过度简化。有关当前线程技术和 Project Loom 更改的完整详细信息,请参阅 Ron Pressler 和 Project Loom 其他成员最近的演讲和采访。

你说:

我认为如果我为我的线程池分配 5 个线程,那么我会阻塞 5 个 cpu 核心。

如上所述,阻塞的 Java 线程不会阻塞 CPU 内核。主机操作系统可以随时选择在该内核上运行其他线程,无论您的 Java 线程是否被阻塞。 “其他线程”是指 Java 线程或其他应用程序的线程。所以请牢记大局:您的 Java 线程可能会在任何时候暂停任何时间,因为主机操作系统认为该机器上当前的操作条件合适。

但是,在 Java 中,如果您使用由 5 个线程组成的固定大小的线程池支持的执行器服务,并且挂起的任务已提交但尚未启动,并且所有 5 个当前任务都发生阻塞,则不会执行更多工作由该执行程序服务,直到当前阻塞清除。

这是在 Project Loom 下使用虚拟线程的变化:任何阻塞的 virtual 线程都被 JVM(而不是主机操作系统)搁置(“停放”),以便其主机操作系统线程可以立即开始执行共享该“真实”操作系统线程的许多其他虚拟线程之一。

【讨论】:

  • 你的超时想法被打破了。 invokeAll 任务完成后返回。
  • @matt 我已纠正,谢谢。我实际上并没有在自己的工作中使用invokeAll,也没有注意到这一事实。我会更新答案。
  • 我认为 matt explane 如何通过 Excutors 执行线程。但是,也许 Writer 希望在等待期间执行。例如,如果在那种情况下,某个线程在等待 3 秒后调用 someMethod。但是 Writer 需要在等待时间内执行。
  • 感谢您的深入解释,但这已经是我正在做的事情。我只是认为 ThreadCount = "blocked Cpu cores",所以我认为如果我为我的线程池分配 5 个线程,那么我会阻止 5 个 cpu 核心。我昨天想要的是“重用”等待的 cpu 内核,但这不是它的工作方式,操作系统已经在这样做了。所以实际的解决方案是将线程数设置为 20 并按预期工作
【解决方案2】:

我认为您正在等待的是 Project Loom:

https://wiki.openjdk.java.net/display/loom/Main

http://cr.openjdk.java.net/~rpressler/loom/loom/sol1_part1.html

https://inside.java/tag/loom

这很快就会出现在 JDK 中,现在可以在 EA(抢先体验)版本中进行测试和反馈。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-21
    • 2019-08-27
    • 1970-01-01
    • 2016-09-12
    相关资源
    最近更新 更多