【问题标题】:java parallelStreams on different machines不同机器上的java parallelStreams
【发布时间】:2021-01-06 01:51:45
【问题描述】:

我有一个函数,它在 forEach 中使用 parallelStream 迭代列表,然后调用一个 API,并将该项目作为参数。然后我将结果存储在 hashMap 中。

    try {
            return answerList.parallelStream()
                    .map(answer -> getReplyForAnswerCombination(answer))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } catch (final NullPointerException e) {
            log.error("Error in generating final results.", e);
            return null;
        }

当我在笔记本电脑 1 上运行它时,需要 1 小时。 但在笔记本电脑 2 上,需要 5 个小时。

做一些基础研究后,我发现并行流使用默认的 ForkJoinPool.commonPool,默认情况下它的线程数比处理器少一个。

笔记本电脑 1 和笔记本电脑 2 具有不同的处理器。

  • 有没有办法找出可以在笔记本电脑 1 和笔记本电脑 2 上并行运行的流的数量?
  • 我可以使用here 给出的建议安全地增加笔记本电脑2 中的并行流数量吗?
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

【问题讨论】:

  • 可以运行多少个流取决于您的代码,无论是完全耗尽 CPU 还是停滞在其他线程可以机会使用的 IO 上等等。我建议只创建一个示例工作负载分钟完成,然后调整线程数,直到找到正确的平衡。理想情况下,您的代码可以在等待 IO 时处理停顿,而下一个线程不会破坏缓存,在这种情况下,可用线程数加一是一个很好的起点。

标签: java concurrency parallel-processing java-stream


【解决方案1】:

织机项目

如果您希望在阻塞的线程代码(而不是 CPU 绑定代码)上获得最大性能,请使用Project Loom 中提供的虚拟线程(纤程)。初步构建是 available now,基于早期访问 Java 16。

虚拟线程

虚拟线程可以大大更快,因为一个虚拟线程在被阻塞时被“停放”,被搁置一旁,所以另一个虚拟线程可以取得进展。这对于阻塞数百万线程的任务非常有效。

放弃溪流方法。只需将每个输入发送到一个虚拟线程。

完整示例代码

让我们为AnswerReply 定义类,我们的输入和输出。我们将使用 Java 16 的新特性 record 作为定义不可变数据驱动类的缩写方式。编译器隐式创建构造函数、getter、equals & hashCodetoString 的默认实现。

public record Answer (String text)
{
}

…和:

public record Reply (String text)
{
}

定义我们的任务以提交给执行器服务。我们编写了一个名为ReplierTask 的类,它实现了Runnable(具有run 方法)。

run 方法中,我们休眠当前线程以模拟等待对数据库、文件系统和/或远程服务的调用。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

public class ReplierTask implements Runnable
{
    private Answer answer;
    ConcurrentMap < Answer, Reply > map;

    public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
    {
        this.answer = answer;
        this.map = map;
    }

    private Reply getReplyForAnswerCombination ( Answer answer )
    {
        // Simulating a call to some service to produce a `Reply` object.
        try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); }  // Simulate blocking to wait for call to service or db or such.
        return new Reply( UUID.randomUUID().toString() );
    }

    // `Runnable` interface
    @Override
    public void run ( )
    {
        System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
        Reply reply = this.getReplyForAnswerCombination( this.answer );
        this.map.put( this.answer , reply );
    }
}

最后,一些代码来完成这项工作。我们创建了一个名为 Mapper 的类,其中包含一个 main 方法。

我们通过填充Answer 对象数组来模拟一些输入。我们创建一个空的ConcurrentMap 来收集结果。我们将每个Answer 对象分配给一个新线程,在该线程中我们调用一个新的Reply 对象并将Answer/Reply 对存储为映射中的一个条目。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Mapper
{
    public static void main ( String[] args )
    {
        System.out.println("Runtime.version(): " + Runtime.version() );
        System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
        System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024)  );
        Mapper app = new Mapper();
        app.demo();
    }

    private void demo ( )
    {
        // Simulate our inputs, a list of `Answer` objects.
        int limit = 10_000;
        List < Answer > answers = new ArrayList <>( limit );
        for ( int i = 0 ; i < limit ; i++ )
        {
            answers.add( new Answer( String.valueOf( i ) ) );
        }

        // Do the work.
        Instant start = Instant.now();
        System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
        ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
                        // Executors.newFixedThreadPool( 5 )
                        // Executors.newFixedThreadPool( 10 )
                        // Executors.newFixedThreadPool( 1_000 )
                        // Executors.newVirtualThreadExecutor()
                )
        {
            for ( Answer answer : answers )
            {
                ReplierTask task = new ReplierTask( answer , results );
                executorService.submit( task );
            }
        }
        // At this point the flow-of-control blocks until all submitted tasks are done.
        // The executor service is automatically closed by this point as well.
        Duration elapsed = Duration.between( start , Instant.now() );
        System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
    }
}

我们可以用平台线程池更改Executors.newVirtualThreadExecutor(),以与虚拟线程进行比较。让我们在 Mac mini Intel 上尝试 5、10 和 1,000 个平台线程池,macOS Mojave 具有 6 个真正的内核、无超线程、32 gigs 内存和 OpenJDK 特殊构建版本 16-loom+9-316 分配的 maxMemory 8 场演出。

10,000 tasks at 1 second each Total elapsed time
5 platform threads half-hour — PT33M29.755792S
10 platform threads quarter-hour — PT16M43.318973S
1,000 platform threads 10 seconds — PT10.487689S
10,000 platform threads Error…
unable to create native thread: possibly out of memory or process/resource limits reached
virtual threads Under 3 seconds — PT2.645964S

注意事项

警告:Project Loom 是实验性的,可能会发生变化,尚未用于生产用途。该团队现在要求人们提供反馈。

警告:视频编码等 CPU 密集型任务应坚持使用平台/内核线程,而不是虚拟线程。大多数执行阻塞操作(如 I/O)的常见代码,例如访问文件、日志记录、访问数据库或进行网络调用,都可能会通过虚拟线程获得巨大的性能提升。

警告:您必须有足够的内存供您的许多甚至所有任务同时运行。如果没有足够的可用内存,您必须采取额外的步骤来限制虚拟线程。

【讨论】:

  • 基本上,您提供的是使用ExecutorService 而不是 Stream API 的解决方案。即使没有使用普通线程池执行器的 Project Loom,此解决方案也可以正常工作。 OP 展示了一个尝试使用 20 个线程的示例。即使使用 1000 个线程,也不一定需要虚拟线程才能流畅运行。提到 Project Loom 作为即将推出的解决方案,即使使用百万线程也可以扩展是一个很好的补充,但没有必要让您的解决方案依赖它。
  • @Holger 作者关心的是性能,总共需要多长时间才能完成所有任务。如果有问题的代码阻塞,您将看到使用虚拟线程的总时间存在巨大差异。很可能是几分钟,而不是问题中引用的小时数。您必须看到实际的虚拟线程才能相信它。所以,是的,虚拟线程是我回答的重点,而不是一般的执行器服务。
  • @Holger 你的评论促使我做一些实验。我更改了我的示例代码,在每个任务中添加了 1 秒的睡眠时间来模拟一些阻塞活动。然后我运行各种执行器服务来比较虚拟线程和平台线程。虚拟线程运行此示例的速度至少比您建议的 1,000 个平台线程快 3 倍。使用较小的线程池,性能差距要大得多。在我更新的答案中查看新代码和结果表。
  • 任务执行实际工作而不是睡觉,结果肯定会不那么引人注目,但这不是重点。您的实验表明,您可以轻松更改执行程序,因此即使普通线程池的性能可能比虚拟线程差,但它的性能优于 OP 的方法,并且可能在今天就足够了。所以很遗憾,您给人的印象是,如果没有 Loom,您的答案就无法使用,而 OP 必须等待它成为生产功能。您不必要地限制了自己答案的有用性。
  • @DuncG 看我的第一句话。看看我的第二个警告。我在我的答案中解释说,当与阻塞的代码一起使用时,虚拟线程会发光。对于纯粹受 CPU 限制的任务,请使用平台/内核线程。最常见的业务应用程序代码涉及日志记录、数据库查询、存储 I/O 和网络调用的阻塞。对于那些,使用虚拟线程。所以我的测试没有被操纵。对Thread.sleep 的调用模拟了这种常见的阻塞。 “LOOM 几乎可以同时在所有线程上运行”这一事实正是我的观点。
【解决方案2】:

设置java.util.concurrent.ForkJoinPool.common.parallelism 将影响可用于使用ForkJoinPool 的操作的线程,例如Stream.parallel()。但是:您的任务是否使用更多线程取决于流中的项目数,以及是否需要更少的时间来运行取决于每个任务的性质和您的可用处理器。

这个测试程序显示了用一个简单的任务改变这个系统属性的效果:

public static void main(String[] args) {
    ConcurrentHashMap<String,String> threads = new ConcurrentHashMap<>();
    int max     = Integer.parseInt(args[0]);
    boolean parallel = args.length < 2 || !"single".equals(args[1]);
    int [] arr = IntStream.range(0, max).toArray();

    long start = System.nanoTime();

    IntStream stream = Arrays.stream(arr);
    if (parallel)
        stream = stream.parallel();
    stream.forEach(i -> {
        threads.put("hc="+Thread.currentThread().hashCode()+" tn="+Thread.currentThread().getName(), "value");
    });
    long end = System.nanoTime();

    System.out.println("parallelism: "+System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
    System.out.println("Threads: "+threads.keySet());
    System.out.println("Array size: "+arr.length+" threads used: "+threads.size()+" ms="+TimeUnit.NANOSECONDS.toMillis(end-start));
}

添加更多线程不一定会加快速度。以下是一些来自测试运行的示例,用于计算使用的线程数。它可以帮助您为自己的任务确定最佳方法,包含在 getReplyForAnswerCombination() 中。

java -cp example.jar -Djava.util.concurrent.ForkJoinPool.common.parallelism=1000 App 100000
Array size: 100000 threads used: 37

java -cp example.jar -Djava.util.concurrent.ForkJoinPool.common.parallelism=50 App  100000
Array size: 100000 threads used: 20

java -cp example.jar APP 100000 single
Array size: 100000 threads used: 1

我建议您在@Basil Bourque 答案中查看线程池(有或没有 LOOM),并且 ForkJoinPool 构造函数的 JDK 源代码也有关于此系统属性的一些详细信息。

private ForkJoinPool(byte forCommonPoolOnly)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-21
    • 2015-05-30
    • 2022-01-11
    • 2015-11-05
    相关资源
    最近更新 更多