织机项目
如果您希望在阻塞的线程代码(而不是 CPU 绑定代码)上获得最大性能,请使用Project Loom 中提供的虚拟线程(纤程)。初步构建是 available now,基于早期访问 Java 16。
虚拟线程
虚拟线程可以大大更快,因为一个虚拟线程在被阻塞时被“停放”,被搁置一旁,所以另一个虚拟线程可以取得进展。这对于阻塞数百万线程的任务非常有效。
放弃溪流方法。只需将每个输入发送到一个虚拟线程。
完整示例代码
让我们为Answer 和Reply 定义类,我们的输入和输出。我们将使用 Java 16 的新特性 record 作为定义不可变数据驱动类的缩写方式。编译器隐式创建构造函数、getter、equals & hashCode 和 toString 的默认实现。
public record Answer (String text)
{
}
…和:
public record Reply (String text)
{
}
定义我们的任务以提交给执行器服务。我们编写了一个名为ReplierTask 的类,它实现了Runnable(具有run 方法)。
在run 方法中,我们休眠当前线程以模拟等待对数据库、文件系统和/或远程服务的调用。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
public class ReplierTask implements Runnable
{
private Answer answer;
ConcurrentMap < Answer, Reply > map;
public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
{
this.answer = answer;
this.map = map;
}
private Reply getReplyForAnswerCombination ( Answer answer )
{
// Simulating a call to some service to produce a `Reply` object.
try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); } // Simulate blocking to wait for call to service or db or such.
return new Reply( UUID.randomUUID().toString() );
}
// `Runnable` interface
@Override
public void run ( )
{
System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
Reply reply = this.getReplyForAnswerCombination( this.answer );
this.map.put( this.answer , reply );
}
}
最后,一些代码来完成这项工作。我们创建了一个名为 Mapper 的类,其中包含一个 main 方法。
我们通过填充Answer 对象数组来模拟一些输入。我们创建一个空的ConcurrentMap 来收集结果。我们将每个Answer 对象分配给一个新线程,在该线程中我们调用一个新的Reply 对象并将Answer/Reply 对存储为映射中的一个条目。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Mapper
{
public static void main ( String[] args )
{
System.out.println("Runtime.version(): " + Runtime.version() );
System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024) );
Mapper app = new Mapper();
app.demo();
}
private void demo ( )
{
// Simulate our inputs, a list of `Answer` objects.
int limit = 10_000;
List < Answer > answers = new ArrayList <>( limit );
for ( int i = 0 ; i < limit ; i++ )
{
answers.add( new Answer( String.valueOf( i ) ) );
}
// Do the work.
Instant start = Instant.now();
System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
try
(
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
// Executors.newFixedThreadPool( 5 )
// Executors.newFixedThreadPool( 10 )
// Executors.newFixedThreadPool( 1_000 )
// Executors.newVirtualThreadExecutor()
)
{
for ( Answer answer : answers )
{
ReplierTask task = new ReplierTask( answer , results );
executorService.submit( task );
}
}
// At this point the flow-of-control blocks until all submitted tasks are done.
// The executor service is automatically closed by this point as well.
Duration elapsed = Duration.between( start , Instant.now() );
System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
}
}
我们可以用平台线程池更改Executors.newVirtualThreadExecutor(),以与虚拟线程进行比较。让我们在 Mac mini Intel 上尝试 5、10 和 1,000 个平台线程池,macOS Mojave 具有 6 个真正的内核、无超线程、32 gigs 内存和 OpenJDK 特殊构建版本 16-loom+9-316 分配的 maxMemory 8 场演出。
| 10,000 tasks at 1 second each |
Total elapsed time |
| 5 platform threads |
half-hour — PT33M29.755792S |
| 10 platform threads |
quarter-hour — PT16M43.318973S |
| 1,000 platform threads |
10 seconds — PT10.487689S |
| 10,000 platform threads |
Error… unable to create native thread: possibly out of memory or process/resource limits reached |
| virtual threads |
Under 3 seconds — PT2.645964S |
注意事项
警告:Project Loom 是实验性的,可能会发生变化,尚未用于生产用途。该团队现在要求人们提供反馈。
警告:视频编码等 CPU 密集型任务应坚持使用平台/内核线程,而不是虚拟线程。大多数执行阻塞操作(如 I/O)的常见代码,例如访问文件、日志记录、访问数据库或进行网络调用,都可能会通过虚拟线程获得巨大的性能提升。
警告:您必须有足够的内存供您的许多甚至所有任务同时运行。如果没有足够的可用内存,您必须采取额外的步骤来限制虚拟线程。