【问题标题】:Why does .AsParallel() hang when run inside a Task?为什么 .AsParallel() 在任务中运行时会挂起?
【发布时间】:2016-04-08 06:17:53
【问题描述】:

在下面的简化代码中,我生成了 200 个任务。每个任务都需要经过一个由锁保护的关键区域。锁内部是一个 .AsParallel() 语句。当我运行程序时,什么也没有发生。程序无限期挂起,不打印任何内容。

private static object lockObject = new object();

static void Main(string[] args)
{
    RunTasks();
}

private static void RunTasks()
{
    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 200; i++)
    {
        tasks.Add(Task.Factory.StartNew(PerformComputations));
    }

    Task.WaitAll(tasks.ToArray());
}

private static void PerformComputations()
{
    // Computations

    lock (lockObject)
    {
        // The actual operations performed here are irrelevant. The key is that they use .AsParallel()
        foreach (int i in Enumerable.Range(0, 500).AsParallel().Select(i => i))
        {
            Console.WriteLine(i);
        }
    }

    // Additional computations
}

但是,如果 RunTasks 是这样实现的,一切都会正常运行(虽然速度很慢):

Parallel.For(0, 200, i =>
{
    PerformComputations();
});

如果我从 PerformComputations 中删除 .AsParallel() 语句,一切都会正常工作。

问题:

  1. 为什么原密码会被锁定?
    • 我的最佳猜测是 RunTasks 产生了 200 个任务,这比我机器上的物理内核数还多。 PerformComputations 中的 lock 语句确保除了一个任务之外的所有任务都被阻塞。当未阻塞线程运行并行查询时,它会将另一个任务排队。但是,最大数量的活动任务已经处于活动状态,因此新任务将永远闲置在队列中。
    • 这是准确的吗?谁能指出我可以确认或更详细解释的文档?
  2. 为什么修改版的 RunTasks 可以工作?
    • 难道只是 Parallel.For 队列小于最大活动任务数吗?
  3. 有没有办法编写 PerformComputations 以使其与原始 RunTasks 方法一起工作但仍并行运行?

【问题讨论】:

  • 我的一部分人会猜测到控制台的 500 个转储会很快排队,因此,通过执行 for 循环,有效地在几毫秒内完成整个执行计算,控制台只需要更长的时间滚动..而因为您在修改后的内容中并行启动任务,它可能看起来更随机,因为有些任务实际上是同时开始的,而不是一个接一个,当它们可能有时间在下一个完全开始之前完成时?
  • 在原始代码中,没有线程可以访问 Console.WriteLine 语句。如果我在该行上放置一个断点,它永远不会被击中,应用程序只是继续无限期地运行而没有 CPU 使用率。修改后的代码并非如此。
  • 你应该避免在任务中完全阻塞代码,而不是使用Task.WaitAll,你应该使用Task.WhenAll或新的async/await语法来指定当所有这些子任务完成时应该发生什么。
  • 此测试没有显示 Task.StartNew 和 Parallel.For 之间有任何区别。您会发现没有任务的简单循环运行速度与第一个版本一样快。由于各种原因,这两个版本都不好,例如lock 意味着一次只能运行一个任务,使并行化毫无意义。
  • 鉴于您没有此时有任何方法并行运行,您只需清理代码即可获得并行化。如果您取消锁定,则每种方法都有效。例如。 仅使用 AsParallel。仅使用 Parallel.For*。一次启动所有任务,虽然 this 在实际场景中会因为抖动而变慢

标签: c# .net multithreading task-parallel-library task


【解决方案1】:

您对问题 #1 和 #2 的回答绝对正确。

回答 #3:您可以在创建任务时指定 TaskCreationOptions.LongRunning。根据https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx 的文档,这将为任务调度程序提供一个提示,表明该任务可能需要一个额外的线程,这样它就不会阻塞其他线程或工作项的前进进度本地线程池队列。

实际上,这会使任务系统忽略线程池,而只是为您的任务提供一个新的专用线程。

【讨论】:

  • 锁强制所有版本同步工作。更改选项不会改变这一点。
  • 否,但它将允许来自具有锁的线程的 AsParallel 调用在线程池上运行,因为它没有填充来自任务工厂的请求。
  • PLINQ 和 Parallel 都使用 ThreadPool。两者都生成任务。两者都将从与内核一样多的任务开始,对数据进行分区并将其提供给它们 - 无需生成 更多 个可以执行的任务。这里假设的差异是因为 Task.Start 一次创建 200 个任务,然后必须按顺序执行。 Parallel 虽然会分批打破 2000 次迭代并将它们分配给任务,但仍然必须按顺序执行。
【解决方案2】:

Parallel.For 和 .ForEach 方法以及 System.Collection.Concurrent 命名空间确实让您轻松处理此类问题。调度程序根据进程优先级、系统工作负载、内核数量等为您处理线程管理......并行化变得容易:

    static void Main(string[] args)
    {
        RunTasks();
    }

    // This sets up the parallel scheduler to use UP TO 16 simultaneous threads. In reality the thread
    // workload is managed by the CLR according to how many logical threads you have available on your
    // processor.
    private static readonly ParallelOptions _po = new ParallelOptions() { MaxDegreeOfParallelism = 16 };

    private static void RunTasks()
    {
        // Run 200 instances of PerformComputations in parallel. 
        Parallel.For(0, 200, _po, i => PerformComputations());
    }

    private static void PerformComputations()
    {
        // If you want to run the 500 iterations in parallel (sequence is not important),
        // use a concurrent collection. This needs absolutely no lock, the collection is
        // partitioned internally to avoid having to lock. Same goes if you need to share
        // data between multiple runs of PerformComputations(), declare a static bag at 
        // class level.
        var theBag = new ConcurrentBag<int>(Enumerable.Range(0, 500));
        Parallel.ForEach(theBag, _po, i =>
        {
            Console.WriteLine(i.ToString());
        });

        // Otherwise you don't need a lock at all anyway since each element here is treated
        // one at a time in sequence.
        var theList = Enumerable.Range(0, 500).ToList();
        foreach (var i in theList)
        {
            Console.WriteLine(i.ToString());
        }
    }

【讨论】:

    【解决方案3】:

    是的,您是正确的 - 原始代码锁定在 PerformComputations 的并行部分。

    LongRunning 强制创建一个全新的非线程池线程(告诉调度程序为任务创建一个新线程)。注意:您可能会创建许多线程,从而导致内存开销和切换开销等问题。

    private static void RunTasks()
    {
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < maxLoops; i++)
        {
            tasks.Add(Task.Factory.StartNew(PerformComputations, TaskCreationOptions.LongRunning));
        }
    
        Task.WaitAll(tasks.ToArray());
    }
    

    有趣的阅读:Parallelism in .NET

    回答问题 3:如果您不介意创建多个线程(使用 Parallel.For)与合并结果(AsParallel().Select)。

    private static void PerformComputations()
    {
        lock (lockObject)
        {
            Parallel.For(0, 500, i =>
            {
                Console.WriteLine(i);
            });
        }
    }
    

    【讨论】:

    • PerformComputations 中的锁会强制所有方法按顺序执行。唯一的区别是每个阻塞是如何造成的。
    【解决方案4】:

    首先,我不明白你为什么要使用AsParallel()。如果您有 200 个大部分独立的Tasks,那应该足以充分利用您的 CPU。这尤其令人困惑,因为AsParallel() 并行执行的唯一操作是无用的Select()

    现在,实际回答您的问题:

    我的最佳猜测是 RunTasks 产生了 200 个任务,这比我机器上的物理内核数还多。

    核心数量无关紧要。可用线程的数量更为重要。 TPL 使用ThreadPool,它对每秒创建的线程数有限制,对线程总数也有硬性限制。如果您达到第一个限制,您的代码可能会慢下来(并且看起来 什么都不做)。如果达到第二个限制,您的代码实际上会死锁并停止工作。

    第一个限制是不可配置的或有据可查的,the second limit is

    在任何情况下,达到这些限制中的任何一个都表明您的代码在并行性方面设计不佳。

    为什么修改版的 RunTasks 可以工作?是不是只有Parallel.For queues小于最大活动任务数?

    是的,Parallel.For 使用较少数量的Tasks,因为这样更有效。

    有没有办法编写 PerformComputations 使其与原始 RunTasks 方法一起工作但仍并行运行?

    我不明白你为什么要这样做。就像我之前说过的,我认为并行运行 Select() 没有任何意义。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-09
      • 2012-02-17
      • 1970-01-01
      相关资源
      最近更新 更多