【问题标题】:Why does Parallel.Foreach create endless threads?为什么 Parallel.Foreach 会创建无限线程?
【发布时间】:2012-06-22 00:07:51
【问题描述】:

下面的代码继续创建线程,即使队列为空..直到最终发生 OutOfMemory 异常。如果我用常规 foreach 替换 Parallel.ForEach,则不会发生这种情况。有人知道为什么会发生这种情况吗?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}

【问题讨论】:

  • 您是否尝试过以低并行度(如 4、8 等)运行 foreach 并查看问题是否仍然存在?
  • 这确实限制了创建的线程数量(但我无法跟上生产者的步伐)。那么这对原始代码意味着什么? TPL 池是否应妥善管理此问题?
  • 如何实例化消费者?你可能是在循环中执行此操作吗?

标签: c# .net multithreading task-parallel-library


【解决方案1】:

我认为Parallel.ForEach() 主要用于处理有界集合。它不会期望像GetConsumingPartitioner() 返回的集合,其中MoveNext() 会阻塞很长时间。

问题在于Parallel.ForEach() 试图找到最佳并行度,因此它启动的Tasks 与TaskScheduler 允许它运行的数量一样多。但是TaskScheduler 看到有很多Tasks 需要很长时间才能完成,而且他们什么也没做(他们阻止)所以它继续开始新的。

我认为最好的解决方案是设置MaxDegreeOfParallelism

作为替代方案,您可以使用 TPL Dataflow 的 ActionBlock。在这种情况下,主要区别在于ActionBlock 在没有要处理的项目时不会阻塞任何线程,因此线程数不会接近限制。

【讨论】:

  • GetConsumingPartitioner 是专门为 BlockingCollection 和 Parallel.ForEach 使用而创建的,请查看 blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
  • 是的,我确实读过。它确实避免了GetConsumingCollection() 的一些问题,但不是全部。
  • 这应该是答案
【解决方案2】:

生产者/消费者模式主要用于只有一个生产者和一个消费者的情况。

但是,您尝试实现的目标(多个消费者)更符合工作列表模式。以下代码摘自犹他大学教授的并行编程课程中 unit2 幻灯片“2c - 共享内存模式”的幻灯片,可在 http://ppcp.codeplex.com/ 下载。

BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount

public void Run()
{
  int num_workers = 4;

  //create worklist, filled with initial work
  worklist = new BlockingCollection<Item>(
    new ConcurrentQueue<Item>(GetInitialWork()));

  cts = new CancellationTokenSource();
  itemcount = worklist.Count();

  for( int i = 0; i < num_workers; i++)
    Task.Factory.StartNew( RunWorker );
}

IEnumberable<Item> GetInitialWork() { ... }

public void RunWorker() {
  try  {
    do {
      Item i = worklist.Take( cts.Token );
      //blocks until item available or cancelled
          Process(i);
      //exit loop if no more items left
    } while (Interlocked.Decrement( ref itemcount) > 0);
  } finally {
      if( ! cts.IsCancellationRequested )
        cts.Cancel();
    }
  }
}

public void AddWork( Item item) {
  Interlocked.Increment( ref itemcount );
  worklist.Add(item);
}

public void Process( Item i ) 
{
  //Do what you want to the work item here.
}

前面的代码允许您将工作列表项添加到队列中,并允许您设置任意数量的工作人员(在本例中为四个)以将项目拉出队列并进行处理。

.Net 4.0 上的并行性的另一个重要资源是“使用 Microsoft .Net 进行并行编程”一书,可在以下网址免费获得:http://msdn.microsoft.com/en-us/library/ff963553

【讨论】:

  • +1 请帮帮我。我不知道如何从该链接中免费获得这本书。
  • @Blam 只需点击左侧菜单中的部分。
  • 谢谢,我的左边塌了。
【解决方案3】:

在任务并行库的内部,Parallel.For 和 Parallel.Foreach 遵循爬山算法来确定应该为操作使用多少并行度。

或多或少,他们从在一项任务上运行主体开始,移动到两项,依此类推,直到达到断点,他们需要减少任务的数量。

这对于快速完成的方法体非常有效,但如果方法体需要很长时间才能运行,它可能需要很长时间才能意识到它需要减少并行度。在此之前,它会继续添加任务,并可能导致计算机崩溃。

我在 Task Parallel Library 的一位开发人员的讲座中了解到上述内容。

指定 MaxDegreeOfParallelism 可能是最简单的方法。

【讨论】:

  • ThreadPool 使用爬山,而不是 Parallel.ForEach。这开始与TaskScheduler 一样多的Tasks 将让它运行。而且由于这里似乎Tasks 可以阻塞(相对)很长时间,这将是一个很高的数字。但是,是的,MDOP 可能是在这里做的最好的事情。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-14
  • 1970-01-01
  • 2018-02-22
  • 1970-01-01
  • 2018-11-06
相关资源
最近更新 更多