【问题标题】:How to prevent Parallel.ForEach loop from changing the number of tasks during runtime?如何防止 Parallel.ForEach 循环在运行时更改任务数?
【发布时间】:2016-02-29 11:49:58
【问题描述】:

我正在使用Parallel.ForEach 循环来完成一些工作,并使用localInit 对其进行初始化,如下所示:

localInit: () => new
{
    foo = new Foo(),
    bars = CreateBars(),
}

根据 MSDN:

localInit,或初始化线程局部变量的函数。 该函数在每个分区调用一次 Parallel.ForEach 操作执行。我们的示例初始化 线程局部变量为零。

所以我尝试这样使用它,但我观察到循环不断地杀死并创建新任务,这导致频繁调用localInit。我的选择会适得其反,不能按预期工作。

我认为当Parallel.ForEach 会创建例如四个分区时,它会让它们保持活动状态,直到它迭代所有项目但它没有。它正在调用localFinallylocalInit 数百次,以获得一个包含几千个项目的集合。怎么样?

可以以某种方式阻止这种行为吗?我真的希望节省一些资源,但它并没有真正让我。


下面是循环的样子:

var parallelLoopResult = Parallel.ForEach
(
    source: items,
    parallelOptions: parallelOptions,
    localInit: () => new
    {
        foo = new Foo(),
        bars = CreateBars(),
    },
    body: (item, loopState, i, local) =>
    {
        parallelOptions.CancellationToken.ThrowIfCancellationRequested();

        var results = local.bars.Select(x => ...).ToList().

        ....

        return local;
    },
    localFinally: local =>
    {
        local.foo.Dispose();
        lock (aggregateLock)
        {
            ... process transformed bars
        }
    }
);

并行选项:

var parallelOptions = new ParallelOptions
{
    CancellationToken = cancellationTokenSource.Token,
#if DEBUG
    MaxDegreeOfParallelism = 1
    //MaxDegreeOfParallelism = Environment.ProcessorCount
#else
    MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};

【问题讨论】:

  • 什么是sourceIEnumerable<T>Partitioner<T>)?你的ParallelOptions是什么?
  • @svick 项目只是字符串(就像数据库中的键)。 ParallelOptions 只需指定 MaxDegreeOfParallelism (Environment.ProcessorCount) 和 CancellationToken。

标签: c# multithreading parallel.foreach


【解决方案1】:

如果我对the code 的理解正确,Parallel.ForEach() 会每隔几百毫秒重新启动每个Task。这意味着,如果每次迭代都很大(通常应该如此),您将获得大量Tasks,因此会收到大量对localInitlocalFinally 的调用。这样做的原因是对于同一进程中也使用相同 ThreadPool 的其他代码的公平性。

我认为没有办法改变Parallel.ForEach() 的这种行为。我认为解决这个问题的一个好方法是编写自己的简单版本的Parallel.ForEach()。考虑到您可以利用Partitioner<T> 并根据您需要Parallel.ForEach() 的哪些功能,它可能相对简单。例如,类似:

public static void MyParallelForEach<TSource, TLocal>(
    IEnumerable<TSource> source, int degreeOfParallelism,
    Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
{
    var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();

    Action taskAction = () =>
    {
        var localState = localInit();

        foreach (var item in partitionerSource)
        {
            localState = body(item, localState);
        }

        localFinally(localState);
    };

    var tasks = new Task[degreeOfParallelism - 1];

    for (int i = 0; i < degreeOfParallelism - 1; i++)
    {
        tasks[i] = Task.Run(taskAction);
    }

    taskAction();

    Task.WaitAll(tasks);
}

【讨论】:

  • 非常好。谢谢你。我会尝试在我的项目中实现它。看起来很有希望。我需要在这里和那里尝试/捕捉,但我想我明白了。在我点击接受之前给我一些时间;-)
  • 好吧,毕竟没那么难 ;-) 使用这个解决方案,循环的性能翻了一番,甚至几乎翻了三倍:-o 太棒了。可惜标准的 ForEach 有这样的缺点。
  • 我刚刚在 body 和 localFinally 周围添加了一个 try/finally,这样当发生不好的事情时它就不会丢失。我理解公平,但无论如何都应该有一个像 max-performance 这样的选项,这样我们就可以依赖 localInit 每次聚会只调用一次。
【解决方案2】:

每个 thread 执行栏只创建一次。但是你知道完成了多少并行执行吗?并行执行引擎可以自行决定是否启动任意数量的并行执行。

如果要限制并行执行,请使用MaxDegreeOfParallelism 属性。这将对一次创建多少条柱形设置一个上限。它仍然无法控制创建的条形总数,而且条形总数可能低于您现在的预期。

如果您想要明确控制,请手动创建任务。

【讨论】:

  • 我已经用 1、2、4 和 8 MaxDegreeOfParallelism 对其进行了测试。我使用的越多,它重新创建任务的频率就越高。我已经编辑了我的问题,也许现在更清楚了。
  • @t3chb0t:是的……您可以在代码中看到将要启动的并行执行的数量由并行执行引擎决定。您错误地期望您可以事先知道将创建多少条柱。现在,条形图的数量将取决于负载均衡器的感觉。如果它觉得它可以有更多的并行执行,你会有更多的酒吧。如果感觉负载过多,您将看到创建的条形图更少。
  • 哦,这是否意味着任务数量在运行时可能会波动,这可能是频繁初始化的原因?我以为这会让他们活着。嗯......有什么办法可以防止这种行为?我不得不承认这很令人不安。
  • @t3chb0t:您只能使用MaxDegreeOfParallelism 属性对同时并行执行的数量设置上限。 AFAIK,这是您可以拥有的最大控制权。
  • @displayName "每个线程栏只创建一次。"正如问题清楚地表明的那样,这不是真的。即使将MaxDegreeOfParallelism 设置为ProcessorCount,也会完成对localInit 的数百次调用,同时最多使用ProcessorCount 线程。
【解决方案3】:

This overload 不是唯一的,所以你可以试试这个:

var bars = CreateBars();
Parallel.Foreach(bars, b => { /* your action here */};

但是,如果您真的想为每个线程创建bars 的副本,您可以使用 LINQ 中的一些复制方法(假设您的条形图是一个 IEnumerable&lt;T&gt; 变量):

var bars = CreateBars();
localInit: () => new
{
    foo = new Foo(),
    bars = new List<IBar>(bars),
}

【讨论】:

  • 不幸的是,这行不通。条是一种我需要应用于每个项目的转换,因此我尝试为每个线程创建这些条,这样我就不必使用锁。每个线程都应该有自己的实例,但不知何故,我在我的日志中看到,当我调试它一遍又一遍地创建它的代码时,thead id 也总是相同的。我已将整个循环添加到问题中。
  • 哦,我喜欢这个bars = new List&lt;IBar&gt;(bars),我会试试看...为什么我没有想到这个想法它似乎如此明显;-)
  • 我想我知道为什么会发生这种情况。该循环不断地杀死并创建新的任务/线程,因为 localFinally 也经常被调用,而没有完全迭代循环。我可能必须调查TaskScheduler,也许这可以防止它发生。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-03-04
  • 2012-01-03
  • 2019-12-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多