【问题标题】:Why is combining Task.Run and plinq so slow?为什么结合 Task.Run 和 plinq 这么慢?
【发布时间】:2019-09-27 22:43:48
【问题描述】:

我发现Task.Run和plinq结合起来非常慢,所以我做了一个简单的实验:

int scale = 32;

Enumerable.Range( 0, scale ).AsParallel().ForAll( i => {
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} );

plinq 内的 plinq 运行良好,在 14 毫秒内完成

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
    Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } ) ).ToArray();
    await Task.WhenAll( _tasks );
} ) ).ToArray();

await Task.WhenAll( tasks );

任务内部的任务也以 14 毫秒结束,但如果我将 Task.Run 内部替换为 plinq,如下所示:

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} ) ).ToArray();

await Task.WhenAll( tasks );

执行需要 29 秒。如果scale 变量更大,情况会变得更糟。

谁能解释一下这个案例发生了什么?


编辑:

我又做了一个实验:

static async Task Main( string[] args )
{
    Stopwatch stopwatch = Stopwatch.StartNew();

    int scale = 8;

    Task[] tasks = Enumerable.Range( 0, scale ).Select( id => Run( scale, id ) ).ToArray();

    await Task.WhenAll( tasks );

    Console.WriteLine( $"ElapsedTime={stopwatch.ElapsedMilliseconds}ms" );
}

static Task Run( int scale, int id )
{
    return Task.Run( () =>
    {
        Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
        {
            for ( int k = 0; k < scale; k++ )
            {

            }

            Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end" );
        } );
    } );
}

这是结果的一部分:

[1557475215796]Task 0 for loop 6 end
[1557475215796]Task 0 for loop 7 end
[1557475216776]Task 4 for loop 0 end
[1557475216776]Task 4 for loop 1 end
[1557475216777]Task 4 for loop 2 end
[1557475216777]Task 4 for loop 3 end
[1557475216778]Task 4 for loop 4 end
[1557475216778]Task 4 for loop 5 end
[1557475216779]Task 4 for loop 6 end
[1557475216780]Task 4 for loop 7 end
[1557475217774]Task 5 for loop 0 end
[1557475217774]Task 5 for loop 1 end
[1557475217775]Task 5 for loop 2 end

查看每个任务之间的时间戳,你会发现每次移动到下一个任务时都有一个神秘的 1000 毫秒延迟。我猜 plinq 或任务中有一种机制会在某些情况下暂停一秒钟,这会显着减慢进程。


感谢@StephenCleary 的解释,现在我明白延迟来自线程的创建。我再次调整我的实验,发现ForAll 方法会阻塞任务,直到不同任务中的所有其他ForAll 方法完成。

static Task Run( int scale, int id )
{
    return Task.Run( () =>
    {
        Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
        {
            for ( int k = 0; k < scale; k++ )
            {

            }

            Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} for loop {j} end, thread count = {Process.GetCurrentProcess().Threads.Count}" );
        } );
        Console.WriteLine( $"[{DateTimeOffset.Now.ToUnixTimeMilliseconds()}]Task {id} finished" );
    } );
}

结果是:

[1557478553656]Task 6 for loop 6 end, thread count = 19
[1557478553657]Task 6 for loop 7 end, thread count = 19
[1557478554645]Task 7 for loop 0 end, thread count = 20
[1557478554647]Task 7 for loop 1 end, thread count = 20
[1557478554649]Task 7 for loop 2 end, thread count = 20
[1557478554651]Task 7 for loop 3 end, thread count = 20
[1557478554653]Task 7 for loop 4 end, thread count = 20
[1557478554655]Task 7 for loop 5 end, thread count = 20
[1557478554657]Task 7 for loop 6 end, thread count = 20
[1557478554659]Task 7 for loop 7 end, thread count = 20
[1557478555644]Task 1 finished
[1557478555644]Task 0 finished
[1557478555644]Task 3 finished
[1557478555644]Task 2 finished
[1557478555644]Task 4 finished
[1557478555644]Task 6 finished
[1557478555644]Task 5 finished
[1557478555644]Task 7 finished

我希望ForAll 方法应该立即返回。为什么会阻塞任务和线程?

【问题讨论】:

  • 我想这会回答你的问题:stackoverflow.com/questions/19102966/….
  • 我不确定
  • 这里的差异是由于在一组Task.Run 的情况下使用异步,这将处理内部Task.Run,但在Task.Run 内部使用PLinq 时,它不是做同样的蜂巢是可行的,因为你阻塞了每一个外部任务,虽然你可能想使用异步包装器而不是同步,但是会付出很多努力,并不值得
  • 两者都不慢。这段代码虽然有问题。首先,在PLINQParallel.ForEach 中使用Task.Run 是没有意义的。所有 CPU 内核已经用于处理输入数据。使用Task.Run 生成的任何新任务都必须等到操作系统调度程序挂起 PLINQ 线程并开始执行新任务
  • 在前两个 sn-ps 中,嵌套循环会损害性能。 CPU 不能同时运行比核心更多的任务,因此额外的任务必须等待调度。 PLINQ 和Parallel.ForEach 用于数据并行,您希望所有内核并行工作以处理大量数据。 PLINQ 通过创建与内核一样多的任务、对输入数据进行分区并将每个分区提供给工作任务来实现这一点。

标签: c# task-parallel-library plinq


【解决方案1】:

问题显然出在您的代码中,让我们回顾一下各种代码 sn-ps,尤其是那些使用 Task 的代码,因为 PLinq 内的 PLinq 很简单,它几乎使用了所有可能的线程/内核来处理尽可能快,不会有太多的上下文转换,因为处理在内存中并且很快。事实上PLinq本身会管理/控制并行调用的数量,而Task.Run是相对独立的。

片段 1

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( async () =>
{
    Task[] _tasks = Enumerable.Range( 0, scale ).Select( j => Task.Run( () =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } ) ).ToArray();
    await Task.WhenAll( _tasks );
} ) ).ToArray();

await Task.WhenAll( tasks );
  • 这里你的完整处理在内存中,每一个外部Task,异步调度内部循环,而Task本身不会阻塞线程,等待内部Tasks完成,所以外部Task.Run,内部@时会异步通知987654328@已完成

现在慢代码会发生什么,让我们回顾一下

片段 2

int scale = 32;

Task[] tasks = Enumerable.Range( 0, scale ).Select( i => Task.Run( () =>
{
    Enumerable.Range( 0, scale ).AsParallel().ForAll( j =>
    {
        for ( int k = 0; k < scale; k++ ) { }
    } );
} ) ).ToArray();

await Task.WhenAll( tasks );
  • 这里每个Task.Run 不会将请求异步移交给内部PLinq 调用,并且会发生什么情况是Task.Run 调用的线程将被阻止以完成内部PLinq,这是问题的主要来源,从而导致高竞争。

如上所述,Task.Run 调用 PLinqPLinq 调用 PLinq 之间存在很大差异,因此关键在于了解这些不同的 API 如何单独工作以及组合的影响是什么它们可以按照您的代码所期望的那样协同工作。

【讨论】:

  • 感谢您的回答。我想知道为什么是29秒?它是 14 毫秒的 2000 倍。由于内部只有一个简单的 for 循环,因此这个结果听起来高得离谱。我猜在这种情况下 plinq 或 Task 内部存在未知开销。
  • 欢迎@LeisenChang,这是一个棘手的部分,因为有很多因素可能会影响,比如服务器配置,假设您使用的是32核系统,在慢速系统中将有32个任务准备好在外部循环中占用它们,但是您可以做一些简单的测试,尝试减少外部刻度数并保持内部与 32 相同,可以从较低的值开始,例如 2 然后 4、6、8,这将对您有所帮助知道性能如何以及何时在外部循环开始下降/可枚举值增加。简单地说,外部级别的阻塞可能会成倍地影响性能
  • 我已经编辑了我的帖子。我发现每个任务的执行之间都有一个奇怪的延迟。你知道为什么吗?
  • @LeisenChang:因为线程池的线程注入率是有限的。
  • @LeisenChang 也是因为 CPU 不能同时运行比内核数更多的线程。 PLINQ/Parallel.Foreach 使用所有可用的内核来处理输入数据,这意味着任何新任务都必须等到操作系统挂起 PLINQ 线程并启动任务的线程
猜你喜欢
  • 2021-08-27
  • 1970-01-01
  • 2019-09-16
  • 2012-08-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-03
相关资源
最近更新 更多