【问题标题】:C# Parallel.For and non-initialized arraysC# Parallel.For 和未初始化的数组
【发布时间】:2019-01-02 08:08:15
【问题描述】:

场景是这样的:在 Parallel.For 内部,一个数组用于非并行 for。数组的所有元素都被覆盖,因此在技术上没有必要分配和初始化它(这总是在构造时发生,据我从 C# 教程中推断):

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For(0,16384,x =>
{
   int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func(); // each element in histogram[] is written anew
   }
   result[x] = do_something_with(histogram);
});

顺序代码中的解决方案很简单:将数组拉到外部for循环的前面:

float[] result = new float[16384];
int[] histogram = new int[32768]; // allocation and initialization with      
for(x = 0; x < 16384; x++)
{
   for (int i = 0; i < histogram.Length; i++)
   {
      histogram[i] = some_func(); 
   }
   restult[x] = do_something_with(histogram);
}

现在在外循环中既没有分配也没有徒劳的 0-ing。 在并行版本中,这肯定是一个糟糕的举动,或者并行进程正在破坏彼此的直方图结果,或者 C# 足够聪明地锁定 histogram 从而关闭任何并行性。分配histogram[16384,32768] 同样是浪费。我现在正在尝试的是

public static ParallelLoopResult For<TLocal>(
    int fromInclusive,
    int toExclusive,
    Func<TLocal> localInit,
    Func<int, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally
)

库构造(函数?),但由于这是我第一次尝试在 C# 中进行并行编程,我充满了疑问。以下是顺序情况的正确翻译吗?

float[] result = new float[16384];
System.Threading.Tasks.Parallel.For<short[]>(0, 16384, 
                                             () => new short[32768], 
                                             (x, loopState, histogram) =>
{
    for (int i = 0; i < histogram.Length; i++)
    {
       histogram[i] = some_func(); 
    }
    result[x] = do_something_with(histogram);
    return histogram;
}, (histogram) => { });

【问题讨论】:

  • 您考虑过将结果写入 ConcurrentBag 吗?这是我认为您正在寻求的线程安全实现:msdn.microsoft.com/en-us/library/dd381779(v=vs.110).aspx
  • @mjwills 澄清一下:如果此代码在 32768 核机器上运行,则需要这么多版本的直方图,但较小规模的并行性表明我不会分配全部如果在任何时候我只需要一把可以立即处理的。
  • 一种选择是为Parallel.For 设置MaxDegreeOfParallelism。然后保持与 MaxDop 大小相同的数组池(基本上是数组数组)。 Interlocked.Increment 获取每次迭代的唯一 ID。 iterationID % MaxDop 获取要使用的池的索引。这为您提供了一个小数组(例如 8 个数组)而不是需要 16384 个数组,并保证一次只有一个线程使用它。 公平地说,这与您在 Parallel.For 线程本地解决方案中使用的方法基本相同。
  • 你能详细说明你想用这段代码完成什么,可能有更好的方法。
  • 具体你关心用多少线程来处理结果?

标签: c# for-loop task-parallel-library allocation


【解决方案1】:

我不完全确定您的要求,但让我们看一个起点:

public void Original()
{
    float[] result = new float[16384];
    System.Threading.Tasks.Parallel.For(0, 16384, x =>
    {
        int[] histogram = new int[32768]; // allocation and initialization with all 0's, no?
        for (int i = 0; i < histogram.Length; i++)
        {
            histogram[i] = some_func(); // each element in histogram[] is written anew
        }
        result[x] = do_something_with(histogram);
    });
}

内部循环生成histogram,而外部循环获取histogram,并使用它在Results 中生成单个值。

一个易于操作的解决方案是执行此处理TPL-Dataflow,这是 TPL 之上的抽象。为了进行设置,我们需要一些 DTO 来通过数据流管道。

public class HistogramWithIndex
{
    public HistogramWithIndex(IEnumerable<int> histogram, int index)
    {
        Histogram = histogram;
        Index = index;
    }
    public IEnumerable<int> Histogram { get; }
    public int Index { get; }
}

public class IndexWithHistogramSize
{
    public IndexWithHistogramSize(int index, int histogramSize)
    {
        Index = index;
        HistogramSize = histogramSize;
    }
    public int Index { get; }
    public int HistogramSize { get; }
}

这些类代表您在不同处理阶段的数据。现在让我们看看管道。

public async Task Dataflow()
{
    //Build our pipeline
    var options = new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        //This is default but I want to point it out
        EnsureOrdered = true
    };
    var buildHistorgramBlock = new TransformBlock<IndexWithHistogramSize, HistogramWithIndex>(inputData =>
    {
        var histogram = Enumerable.Range(0, inputData.HistogramSize).Select(_ => some_func());
        return new HistogramWithIndex(histogram, inputData.Index);
    }, options);
    var doSomethingBlock = new TransformBlock<HistogramWithIndex, int>(x => do_something_with(x.Histogram.ToArray()), options);

    var resultBlock1 = new ActionBlock<int>(x => Results1.Add(x), options);
    //var resultBlock2 = new ActionBlock<int>(x => //insert into list with index, options);

    //link the blocks
    buildHistorgramBlock.LinkTo(doSomethingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    doSomethingBlock.LinkTo(resultBlock1, new DataflowLinkOptions() { PropagateCompletion = true });

    //Post data
    var histogramSize = 32768;
    foreach (var index in Enumerable.Range(0, 16384))
    {
        await buildHistorgramBlock.SendAsync(new IndexWithHistogramSize(index, histogramSize));
    }

    buildHistorgramBlock.Complete();
    await resultBlock1.Completion;
} 

由两个TransformBLocksActionBlock 组成的块形成一个链接的管道。这里的好处是可以很容易地改变并行度、每个块引入背压的有界容量等等。

一个重要说明:TransformBlocks,如果使用并行性,即 MDOP >1,那么他们将按照收到的顺序输出他们的项目。这意味着如果他们按顺序进来,他们就会按顺序离开。您还可以使用阻止选项Ensure Ordering 关闭排序。如果您希望您的项目在没有/有特定顺序的特定索引中,这就会发挥作用。

这似乎有点矫枉过正,可能适合您的项目。但我发现这非常灵活且易于维护。尤其是当您开始向处理链中添加步骤时,添加一个块比围绕所有内容包装另一个 for 循环要干净得多。

这是 c&p 的其余样板代码

private ConcurrentBag<int> Results1 = new ConcurrentBag<int>();
private int some_func() => 1;
private int do_something_with(int[] i) => i.First();

【讨论】:

  • 感谢这个酷而有趣的答案。虽然我可能不会为手头的特定问题采用它(我的 for 循环正在运行,因此我可以离开现场:D)我肯定会适应这种并行性形式,因为它也是我所设想的“正确”形式. (对于我最初的问题“我的代码是否错误”:看起来您隐含地说“不”,对吗?)
  • 好吧,如果代码“有效”,那么我会说它没有错。另一方面,总有办法改进代码。我的回答只是选择一种方便的技术,您可以使用响应式扩展等。处理异步和当前开发的技术有很多。
【解决方案2】:

你在正确的轨道上。

循环的 16K 次迭代将由少数线程处理。如果你在循环外捕获一个局部变量,那么每次迭代都会共享同一个对象。如果您声明一个本地并在循环内分配一个对象,那么每个您将有 16K 分配。

有一个可以分配对象的中间位置,它被称为Thread Local Variables

它主要用于累积全局结果但希望最小化同步开销的场景。因此,每个运行循环迭代的线程都会获得自己的状态副本,然后最后您可以聚合它们。

但是使用它在多个循环迭代中重用 a 缓冲区也很好。

【讨论】:

    【解决方案3】:

    您在任务和线程之间共享的越少,并行化您的代码就越容易。

    如果您想减少分配,您可以重复使用缓冲区。这不仅会减少分配,还会减少昂贵的垃圾收集。只是不要将它们存储在线程本地状态,否则您将不得不自己处理分配、重新分配和清除它们。

    Parallel.ForEach 使用任务,而不是线程,这意味着服务于一项任务的线程最终可以服务于完全不相关的东西,但保留一个没人需要的缓冲区。此外,Parallel.Foreach 可以配置为 recycle 任务,以 .... 清理累积状态。在这种情况下,您必须从线程状态中清除缓冲区并重新初始化它们。

    使用内存/缓冲池来“租用”现成的缓冲区并在使用完它们后“释放”它们会更容易且性能更高。

    过去,您可以使用 BufferManager 类提供现成的 byte[] 数组,并被 WCF 用于重用缓冲区。

    现在更好的选择是使用 System.Buffers 包中的 ArrayPool,它可以返回任何类型的数组:

    var pool = ArrayPool<int>.Shared;
    var bufferLength = 32768;
    
    var result = new float[16384];
    Parallel.For(0,16384,x =>
    {
       try
       {
           var histogram = pool.Rent(bufferLength);
           for (int i = 0; i < bufferLength; i++)
           {
              histogram[i] = some_func(); 
           }
           result[x] = do_something_with(histogram);
        }
        finally
        {
           //Ensure the buffer is returned even in case of error
           pool.Return(histogram);
        }
    });
    

    您指定的尺寸是最小值。如果没有与请求匹配的可用缓冲区,则池可能会返回更大的缓冲区,这就是为什么您必须在循环中使用bufferLength

    在大多数情况下使用共享池应该没问题。它的默认最大数组大小为 1MB (1 048 576),每个大小存储桶 50 个数组。 ArrayPool 将相同大小的缓冲区组织在桶中,以便更快地访问。

    如果需要,您可以创建另一个具有特定最大缓冲区大小的池。创建数组时可以指定每个桶中最多可以放置多少个数组,例如:

    var maxLength = 32768;
    var pool = ArrayPool<int>.Create(maxLength,1024);
    var bufferLength = 32768;
    

    ArrayPool 的代码并不复杂。你可以查看here

    【讨论】:

      猜你喜欢
      • 2017-01-14
      • 2021-06-08
      • 2013-04-25
      • 1970-01-01
      • 2013-05-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多