【问题标题】:Which is faster? Less work in more runnables, or more work in less runnables? (ExecutorService)哪个更快?在更多的可运行文件中做更少的工作,还是在更少的可运行文件中做更多的工作? (执行服务)
【发布时间】:2015-01-18 03:11:05
【问题描述】:

我试图弄清楚如何从多线程应用程序中获得最大性能。
我有一个这样创建的线程池:

ExecutorService executor = Executors.newFixedThreadPool(8); // I have 8 CPU cores.  

我的问题是,我应该将工作分成 8 个可运行/可调用对象,这与线程池中的线程数相同,还是应该将其分成 1000000 个可运行对象/可调用对象?

for (int i = 0; i < 1000000; i++) 
{
    Callable<Long> worker = new MyCallable();  // Each worker does little work.
    Future<Long> submit = executor.submit(worker);
}

long sum = 0;

for (Future<Long> future : list) 
    sum += future.get();  // Much more overhead from the for loops

for (int i = 0; i < 8; i++) 
{
    Callable<Long> worker = new MyCallable();  // Each worker does much more work.
    Future<Long> submit = executor.submit(worker);
}

long sum = 0;

for (Future<Long> future : list) 
    sum += future.get();  // Negligible overhead from the for loops

划分为 1000000 个可调用对象对我来说似乎更慢,因为实例化所有这些可调用对象并在 for 循环中从它们收集结果会产生开销。另一方面,如果我有 8 个可调用对象,则此开销可以忽略不计。而且由于我只有 8 个线程,我不能同时运行 1000000 个可调用对象,因此没有性能提升。

我是对还是错?

顺便说一句,我可以测试这些情况,但操作非常简单,我猜编译器意识到了这一点并进行了一些优化。所以结果可能会产生误导。我想知道哪种方法更适合图像处理应用。

【问题讨论】:

  • 第二种方法更好恕我直言,开销也是用户级线程的一个因素
  • 如果您将 1000000 个工作单元划分为 8 个可运行文件,您最终不会得到某种队列,其中每个可运行文件都有 N/8 个项目吗?这与一开始就拥有所有这些 N 个可运行文件不一样吗(除了你现在有一个两层工作队列)?
  • 不是真正的两层,因为我的整个工作都是 N*W。 (N 是 runnable 的数量,W 是每个 runnable 所做的工作。)如果我将 N 减少到 8,我将相应地增加 W。我仍然可以在一个可运行的整体中完成 W 数量的工作。但是如果我的 N 是 1000000,我必须创建那么多可运行对象并从所有这些可运行对象中收集结果。
  • @Utku:那是两层。您可以为每个工作单元创建一个 Runnable,并且只有一个队列(由 ExecutorService 为您管理)。

标签: java multithreading performance concurrency threadpool


【解决方案1】:

这个问题没有直接的答案,因为它取决于很多东西,比如你的代码、应用程序 loigc、最大值、可能的并发性、硬件等。

但在考虑并发性时,您应该考虑以下事项,

  1. 每个可运行对象都需要一个堆栈,该堆栈对于该线程来说是私有的,因此如果您创建较大的编号。线程中的线程内存消耗超过实际应用程序使用量

  2. 线程应该执行独立且并行的任务。

    找出可以在没有任何依赖关系的情况下实际并行执行的代码补丁,否则线程将无济于事

  3. 什么是硬件配置?

    您可以实现的最大并发执行线程数等于总数。 cpu 核心数。如果你少了没有。核心和巨大的没有。线程然后切换任务比实际线程更活跃(使用cpu)。这会严重影响性能

总而言之,您的第二种方法对我来说看起来不错,但如果可能的话,您可以找到更多并行性,您可以将其扩展到 20-30。

【讨论】:

  • 请注意,问题不在于改变线程数。这里固定为 8。这 1000000 个 Runnables不需要需要自己的堆栈空间。
  • 为混淆道歉,我不是要求修改否。线程,我只是陈述了在决定否时应该考虑的考虑因素。线程数及其对性能的影响。
【解决方案2】:

也许这段代码有帮助。它将使用分叉连接池计算斐波那契数。使用 fork-join 我们可以递归地细分问题并组合每个递归级别的结果。从理论上讲,我们可以递归到 fork-join 池中的 fib(0),但这将是低效的。因此,我们引入了一个递归限制,我们停止细分任务并计算当前任务中的其余部分。此代码将记录 fib(x) 所用的时间,并计算 n 到 x 的每个 fib(n) 的单线程时间。对于每个递归限制,它将测量平均创建了多少任务以及每个任务运行了多长时间。

通常,最佳点是大小超过 1µs 的任务,但是我们这里的简单斐波那契任务几乎不需要内存/缓存。对于数据密集且缓存污染较高的任务,切换成本更高,并发任务可能会污染共享缓存。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FibonacciFork extends RecursiveTask<Long> {

    private static final long serialVersionUID = 1L;

    public FibonacciFork( long n) {
        super();
        this.n = n;
    }

    static ForkJoinPool fjp = new ForkJoinPool( Runtime.getRuntime().availableProcessors());

    static long fibonacci0( long n) {
        if ( n < 2) {
            return n;
        }
        return fibonacci0( n - 1) + fibonacci0( n - 2);
    }

    static int  rekLimit = 8;
    private static long stealCount;
    long    n;
    private long forkCount;
    private static AtomicLong forks = new AtomicLong( 0);

    static class Result {
        long    durMS;
        int rekLimit;
    }

    public static void main( String[] args) {

        int fiboArg = 49;
        BenchLogger.sysinfo( "Warmup");
        long    singleNS[] = getSingleThreadNanos( 20, 5e9);
        BenchLogger.sysinfo( "Warmup complete");
        singleNS = getSingleThreadNanos( fiboArg, 1e9);
        BenchLogger.sysinfo( "Single Thread Times complete");
        Result[] results = new Result[ fiboArg + 1];
        for ( int rekLimit = 2;  rekLimit <= fiboArg;  rekLimit++) {
            results[ rekLimit] = new Result();
            runWithRecursionLimit( rekLimit, fiboArg, singleNS[ rekLimit], results[ rekLimit]);
        }
        System.out.println( "CSV results for Fibo " + fiboArg + "\n" + "RekLimit\t" + "Jobs ns\t" + "time ms");
        for ( int rekLimit = 2;  rekLimit <= fiboArg;  rekLimit++) {
            System.out.println( rekLimit + "\t" + singleNS[ rekLimit] + "\t" + results[ rekLimit].durMS);
        }
    }

    private static long[] getSingleThreadNanos( final int n, final double minRuntimeNS) {
        final long timesNS[] = new long[ n + 1];
        ExecutorService es = Executors.newFixedThreadPool( Math.max( 1, Runtime.getRuntime().availableProcessors() / 8));
        for ( int i = 2;  i <= n;  i++) {
            final int arg = i;
            Runnable runner = new Runnable() {
                @Override
                public void run() {
                    long    start = System.nanoTime();
                    long result = fibonacci0( arg);
                    long    end = System.nanoTime();
                    double  durNS = end - start;
                    long        ntimes = 1;
                    double fact = 1;
                    while ( durNS < minRuntimeNS) {
                        long    oldNTimes = ntimes;
                        if ( durNS > 0) {
                            ntimes = Math.max( 1, ( long) ( oldNTimes * fact * minRuntimeNS / durNS));
                        } else {
                            ntimes *= 2;
                        }
                        start = System.nanoTime();
                        for ( long i = 0;  i < ntimes;  i++) {
                            result = fibonacci0( arg);
                        }
                        end = System.nanoTime();
                        durNS = end - start;
                        fact *= 1.1;
                    }
                    timesNS[ arg] = ( long) ( durNS / ntimes);
                    System.out.println( "Single Fib(" + arg + ")=" + result + " in " + ( timesNS[ arg] / 1e6) + "ms (" + ntimes + " loops in " + (durNS / 1e6)
                            + " ms)");
                }
            };
            es.execute( runner);
        }
        es.shutdown();
        try {
            es.awaitTermination( 1, TimeUnit.HOURS);
        } catch ( InterruptedException e) {
            BenchLogger.sysinfo( "Single Timeout");
        }
        return timesNS;
    }

    private static void runWithRecursionLimit( int r, int arg, long singleThreadNanos, Result result) {
        rekLimit = r;
        long    start = System.currentTimeMillis();
        long    fiboResult = fibonacci( arg);
        long    end = System.currentTimeMillis();
        // Steals zählen
        long    currentSteals = fjp.getStealCount();
        long    newSteals = currentSteals - stealCount;
        stealCount = currentSteals;
        long    forksCount = forks.getAndSet( 0);
        final long durMS = end-start;
        System.out.println( "Fib(" + arg + ")=" + fiboResult + " in " + durMS + "ms, recursion limit: " + r +
                " at " + ( singleThreadNanos / 1e6) + "ms, steals: " + newSteals + " forks " + forksCount);
        result.durMS = durMS;
        result.rekLimit = r;
    }

    static long fibonacci( final long arg) {
        FibonacciFork   task = new FibonacciFork( arg);
        long result = fjp.invoke( task);
        forks.set( task.forkCount);
        return result;
    }

    @Override
    protected Long compute() {
        if ( n <= rekLimit) {
            return fibonacci0( n);
        }
        FibonacciFork   ff1 = new FibonacciFork( n-1);
        FibonacciFork   ff2 = new FibonacciFork( n-2);
        ff1.fork();
        long    r2 = ff2.compute();
        long    r1 = ff1.join();
        forkCount = ff2.forkCount + ff1.forkCount + 1;
        return r1 + r2;
    }
}

【讨论】:

  • 这段代码是自己实现工作窃取算法还是我必须自己实现?我研究了 fork join 框架并得出结论,它的工作窃取是它相对于标准固定线程池方法的唯一优势。
  • 工作窃取是 fork-join 的一部分,内置在 JDK 类中,对你没有用。实际上,这并不重要,因为关键是工作规模/完成时间的权衡。看到窃取更像是 fork-join 线程池的监控功能。
【解决方案3】:

这个问题有两个方面。

首先,您拥有技术方面的 Java 资料。由于您对此有一些答案,我将总结这些基础知识:

  • 如果您有 N 个内核,那么只要每个任务仅受 CPU 限制(即不涉及 I/O),N 个线程将为您提供最佳结果
  • 每个Thread 应该做的工作比任务所需的要多,也就是说,让 N 线程计数到 10 会慢得多,因为创建和管理额外的 Threads 的开销高于计数到的好处10 个并行
  • 您需要确保任何同步开销都低于正在完成的工作,即让 N 个 Threads 调用 synchronized 增量方法会慢得多
  • Threads 确实占用资源,最常见的是内存。您拥有的线程越多,估计您的内存使用量就越困难,并且可能会影响 GC 时间(很少见,但我见过这种情况)

其次,你有调度理论。你需要考虑你的程序在做什么

  • 通常使用Threads 来阻止I/O 操作。如果您可以将 CPU 用于其他任务,您不希望您的程序等待网络或 HDD
  • 有几本关于调度的好书(记不起名字了)可以帮助你设计高效的程序。在您提到的示例中,可能存在额外线程有意义的情况。例如如果您的任务没有确定的持续时间,是偏斜的并且您的平均响应时间很重要:假设您有 2 个核心任务和 4 个任务。任务 A 和 B 各需要 1 分钟,但 C 和 D 需要 10 分钟。如果你对 2 个线程运行这些,首先执行 C & D,你的总时间将是 11 分钟,但你的平均响应时间将是 (10+10+11+11)/4=10.5 分钟。如果您针对 4 个线程执行,那么您的响应时间将为 ((1+a)+(1+a)+(10+a)+(10+a))/4=5.5+a,其中a 是调度等待时间近似。这是非常理论化的,因为有许多变量没有解释,但可以帮助设计线程程序。 (同样在上面的示例中,由于您正在等待Futures,您很可能并不关心平均响应时间)
  • 使用多个Thread 池时必须小心。使用多个池可能会导致死锁(如果在两个池之间引入了依赖关系)并且难以优化(池之间可能会产生争用,并且可能无法获得正确的大小)

--编辑--

最后,如果有帮助的话,我对性能的看法是我有 4 种主要资源:CPU、RAM、磁盘和网络。我试图找出哪个是我的瓶颈并使用非饱和资源进行优化。例如,如果我有大量空闲 CPU 和低内存,我可能会压缩我的内存数据。如果我有大量磁盘 I/O 和大内存,请缓存更多数据。如果网络资源(不是实际的网络连接)很慢,请使用许多线程来并行化。一旦您的关键路径上的资源类型饱和并且无法使用其他资源来加速它,您已经达到了最大性能,您需要升级硬件以获得更快的结果。

【讨论】:

  • 如果涉及 I/O 操作会怎样?它如何影响我应该创建的线程数?
  • 取决于 I/O,但例如考虑您要处理两个非常慢的网站,假设每个网站需要 2 分钟才能将数据带到您的程序中,需要 2 秒来处理。如果您在单个线程中执行此操作,则需要 4 分 4 秒,但如果您并行执行此操作,则假设您的系统中有 2 个内核,则需要 2 分 2 秒。我也编辑了帖子...
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-28
  • 1970-01-01
  • 2010-11-15
  • 2018-05-20
  • 2015-09-05
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多