【发布时间】:2012-06-22 00:07:51
【问题描述】:
下面的代码继续创建线程,即使队列为空..直到最终发生 OutOfMemory 异常。如果我用常规 foreach 替换 Parallel.ForEach,则不会发生这种情况。有人知道为什么会发生这种情况吗?
public delegate void DataChangedDelegate(DataItem obj);
public class Consumer
{
public DataChangedDelegate OnCustomerChanged;
public DataChangedDelegate OnOrdersChanged;
private CancellationTokenSource cts;
private CancellationToken ct;
private BlockingCollection<DataItem> queue;
public Consumer(BlockingCollection<DataItem> queue) {
this.queue = queue;
Start();
}
private void Start() {
cts = new CancellationTokenSource();
ct = cts.Token;
Task.Factory.StartNew(() => DoWork(), ct);
}
private void DoWork() {
Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
if (item.DataType == DataTypes.Customer) {
OnCustomerChanged(item);
} else if(item.DataType == DataTypes.Order) {
OnOrdersChanged(item);
}
});
}
}
【问题讨论】:
-
您是否尝试过以低并行度(如 4、8 等)运行 foreach 并查看问题是否仍然存在?
-
这确实限制了创建的线程数量(但我无法跟上生产者的步伐)。那么这对原始代码意味着什么? TPL 池是否应妥善管理此问题?
-
如何实例化消费者?你可能是在循环中执行此操作吗?
标签: c# .net multithreading task-parallel-library