【问题标题】:Why does the localInit Func get called multiple times per thread in Parallel.ForEach为什么在 Parallel.ForEach 中每个线程多次调用 localInit Func
【发布时间】:2013-01-19 23:55:22
【问题描述】:

我正在编写一些代码来处理大量数据,我认为让 Parallel.ForEach 为它创建的每个线程创建一个文件会很有用,这样输出就不需要同步(至少我)。

看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

我预计会创建多达 8 个文件,并且会在整个运行期间持续存在。然后,当整个 ForEach 调用完成时,每个都将被处置。真正发生的是 localInit 似乎为每个项目调用一次,所以我最终得到了数百个文件。编写器也会在处理的每个项目结束时进行处理。

这表明发生了同样的事情:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

我明白了:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注意:如果我省略了 Thread.Sleep 调用,它有时似乎“正常”运行。对于它决定在我的电脑上使用的 4 个线程,localInit 只被调用一次。然而,并非每次都如此。

这是函数的期望行为吗?导致它这样做的幕后发生了什么?最后,获得所需功能 ThreadLocal 的好方法是什么?

顺便说一下,这是在 .NET 4.5 上的。

【问题讨论】:

    标签: c# .net task-parallel-library .net-4.5


    【解决方案1】:

    Parallel.ForEach 不像你想象的那样工作。需要注意的是,该方法建立在Task 类之上,并且TaskThread 之间的关系不是1:1。例如,您可以有 10 个任务在 2 个托管线程上运行。

    尝试在方法体中使用这一行而不是当前行:

    Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                      Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
    

    您应该看到ThreadId 将在许多不同的任务中重复使用,由它们的唯一 ID 显示。如果您留下或增加对Thread.Sleep 的呼叫,您会看到更多。

    Parallel.ForEach 方法如何工作的(非常)基本思想是,它需要您的枚举创建一系列任务,这些任务将运行枚举的进程部分,完成方式很大程度上取决于输入。还有一些特殊的逻辑可以检查任务超过一定毫秒数而没有完成的情况。如果这种情况属实,那么可能会产生一个新任务来帮助减轻工作量。

    如果您查看Parallel.ForEachlocalinit 函数的文档,您会注意到它说的是returns the initial state of the local data for each _task_,而不是每个线程

    您可能会问为什么会生成超过 8 个任务。该答案与 ParallelOptions.MaxDegreeOfParallelism 的文档中的最后一个答案相似。

    从默认值更改MaxDegreeOfParallelism 只会限制将使用多少个并发任务。

    此限制仅针对并发任务的数量,而不是对在整个处理过程中将创建的任务数量的硬限制。正如我上面提到的,有时会产生一个单独的任务,这会导致您的 localinit 函数被多次调用并将数百个文件写入磁盘。

    写入磁盘肯定是一个有一点延迟的操作,尤其是在您使用同步 I/O 时。当磁盘操作发生时,它会阻塞整个线程; Thread.Sleep 也是如此。如果Task 这样做,它将阻塞当前正在运行的线程,并且没有其他任务可以在其上运行。通常在这些情况下,调度程序会生成一个新的Task 来帮助弥补这一缺陷。

    最后,获得我想要的功能 ThreadLocal 的好方法是什么?

    底线是线程局部变量对Parallel.ForEach 没有意义,因为您没有处理线程;你正在处理任务。本地线程可以在任务之间共享,因为许多任务可以同时使用同一个线程。此外,任务的本地线程可能会在执行过程中发生变化,因为调度程序可以抢占它的运行,然后在另一个线程上继续执行,该线程将具有不同的本地线程。

    我不确定最好的方法,但是您可以依靠localinit 函数来传递您想要的任何资源,一次只允许一个资源在一个线程中使用。您可以使用localfinally 将其标记为不再使用,从而可供其他任务获取。这就是这些方法的设计目的;每个方法只在每个生成的任务中调用一次(请参阅Parallel.ForEach MSDN 文档的备注部分)。

    您也可以自己拆分工作,创建自己的线程集并运行您的工作。但是,在我看来,这不是什么好主意,因为 Parallel 类已经为您完成了这项繁重的工作。

    【讨论】:

    • 但是为什么Parallel.ForEach()在这种情况下会创建超过8个Tasks?
    • @svick 因为它只限制了一次运行的 8 个 并发 任务。您不能限制方法生命周期内创建 的任务数。当任务花费的时间超过指定的时间时,Parallel 类中的逻辑会导致任务的状态被保存并稍后被复制。这样做是为了提高性能。如果范围被分成更小的任务,正在等待的线程(空队列)可以从其他工作线程的任务队列中窃取工作。这可以防止不均匀的工作负载,其中线程会慢慢停止工作,直到一个人试图完成。
    【解决方案2】:

    您所看到的是试图尽快完成工作的实施。

    为此,它尝试使用不同数量的任务来最大化吞吐量。它从线程池中获取一定数量的线程并运行您的工作一段时间。然后它尝试添加和删除线程以查看会发生什么。它会继续这样做,直到您完成所有工作。

    该算法非常愚蠢,因为它不知道您的工作是使用大量 CPU 还是大量 IO,或者即使有很多同步并且线程相互阻塞。它所能做的就是添加和删除线程并测量每个工作单元完成的速度。

    这意味着它会在注入和退出线程时不断调用您的 localInitlocalFinally 函数 - 这就是您所发现的。

    不幸的是,没有简单的方法来控制这个算法。 Parallel.ForEach 是一种高级构造,它有意隐藏了大部分线程管理代码。


    使用ThreadLocal 可能会有所帮助,但它依赖于这样一个事实,即当Parallel.ForEach 请求新线程时,线程池将重用相同的线程。这不是保证 - 事实上,线程池不太可能在整个调用中恰好使用 8 个线程。这意味着您将再次创建不必要的文件。


    保证的一件事是Parallel.ForEach 在任何时候都不会使用超过MaxDegreeOfParallelism 的线程。

    您可以通过创建一个固定大小的文件“池”来利用这一点,以便在特定时间运行的任何线程都可以重复使用这些文件。您知道只有MaxDegreeOfParallelism 线程可以同时运行,因此您可以在调用ForEach 之前创建该数量的文件。然后在您的localInit 中抓取一个并在您的localFinally 中释放它。

    当然,您必须自己编写这个池,并且它必须是线程安全的,因为它会被并发调用。不过,一个简单的锁定策略应该就足够了,因为与锁定成本相比,线程不会很快注入和退出。

    【讨论】:

    • 为什么不使用并发集合(如ConcurrentQueue)而不是锁定?
    • @svick +1 是的 - 这会更容易。他还可以使用Lazy&lt;some file wrapper&gt; 的集合,这样在需要之前不会创建文件。
    【解决方案3】:

    根据MSDNlocalInit 方法为每个任务调用一次,而不是为每个线程调用一次:

    对于参与循环执行的每个任务都会调用一次 localInit 委托,并返回每个任务的初始本地状态。

    【讨论】:

      【解决方案4】:

      localInit 在线程创建时调用。 如果 body 需要很长时间,它必须创建另一个线程并挂起当前线程, 如果它创建另一个线程,它会调用 localInit

      当 Parallel.ForEach 调用它时,它也会创建与 MaxDegreeOfParallelism 值一样多的线程,例如:

      var k = Enumerable.Range(0, 1);
      Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....
      

      它在第一次调用时创建了 4 个线程

      【讨论】:

      • 这不准确,localInit 在创建 Task 时被调用,而不是 Thread。而Parallel.ForEach() 不能直接与Threads 一起使用,它可以与Tasks 一起使用并让ThreadPool 担心Threads。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多