【发布时间】:2015-10-15 15:29:47
【问题描述】:
我正试图一口气从ConcurrentBag 获取所有项目。由于集合中没有像 TryEmpty 这样的东西,因此我采用了与此处所述相同的方式使用 Interlocked.Exchange:How to remove all Items from ConcurrentBag?
我的代码如下所示:
private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.
public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
this._allFoos.Add(toInsert);
return true;
}
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken) state;
var workingSet = new List<Foo>();
while (!token.IsCancellationRequested)
{
if (!workingSet.Any())
{
workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
}
var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);
if (processingCount > 0)
{
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(workingSet.Take(processingCount));
}
workingSet.RemoveRange(0, processingCount);
}
}
}
问题是这有时会遗漏添加到列表中的项目。我编写了一个测试应用程序,它将数据提供给我的ConcurrentBag.Add 方法并验证它正在发送所有数据。当我在Add 调用上设置断点并检查ConcurrentBag 之后的计数时,它为零。该项目只是没有被添加。
我相当肯定这是因为 Interlocked.Exchange 调用没有使用 ConcurrentBag 的内部锁定机制,所以它在交换的某个地方丢失了数据,但我不知道实际发生了什么。
如何在不借助自己的锁定机制的情况下一次从ConcurrentBag 中取出所有项目?还有为什么Add 会忽略这个项目?
【问题讨论】:
-
看起来您正在实现生产者-消费者模式。您是否考虑过使用 TPL 数据流?
-
不,实际上我以前从未见过。现在阅读它。
-
_"无需求助于我自己的锁定机制" -- 既然
ConcurrentBag一开始就没有提供这样的机制,为什么你认为甚至可以在不实现自己的情况下这样做?用lock包围代码并完成它有什么问题?请注意,InterlockedExchange仅在所有其他访问使用相同时才保护变量;否则,您仍然有可能以不同步的方式更改变量(即,即使您尝试从集合中提取项目,其他一些代码也会获取旧值并使用它)。 -
如果您只是尝试实现生产者-消费者,请注意
BlockingCollection<T>(您可以使用ConcurrentBag<T>对其进行初始化)提供易于使用、直接的生产者-消费者语义。如果你真正想要的只是生产者-消费者,你可以使用它。 -
那么如果在访问元素时没有内部锁定机制,
ConcurrentBag相对于列表有什么好处?
标签: c# multithreading concurrency interlocked