我认为这是一个有趣的问题,所以我花了一些时间。
我理解的场景是这样的:
- 您的 BlockingCollection 已满
- 启动了多个线程,每个线程都试图添加到 BlockingCollection。这些调用都会阻塞;这就是为什么它们需要并行发生。
- 当空间变得可用时,添加调用将变得畅通无阻。
- 对
Add 的调用需要按照收到的顺序完成。
首先,我们来谈谈代码结构。我建议不要使用 BlockingCollection 并围绕它编写程序代码,而是扩展 BlockingCollection 并将 Add 方法替换为您需要的功能。它可能看起来像这样:
public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
private FifoMonitor monitor = new FifoMonitor();
public QueuedBlockingCollection(int max) : base (max) {}
public void Enqueue(T item)
{
using (monitor.Lock())
{
base.Add(item);
}
}
}
这里的诀窍是使用FifoMonitor 类,它可以为您提供lock 的功能,但会强制执行顺序。不幸的是,CLR 中不存在这样的类。但是我们可以write one:
public class FifoMonitor
{
public class FifoCriticalSection : IDisposable
{
private readonly FifoMonitor _parent;
public FifoCriticalSection(FifoMonitor parent)
{
_parent = parent;
_parent.Enter();
}
public void Dispose()
{
_parent.Exit();
}
}
private object _innerLock = new object();
private volatile int counter = 0;
private volatile int current = 1;
public FifoCriticalSection Lock()
{
return new FifoCriticalSection(this);
}
private void Enter()
{
int mine = Interlocked.Increment(ref counter);
Monitor.Enter(_innerLock);
while (current != mine) Monitor.Wait(_innerLock);
}
private void Exit()
{
Interlocked.Increment(ref current);
Monitor.PulseAll(_innerLock);
Monitor.Exit(_innerLock);
}
}
现在进行测试。这是我的程序:
public class Program
{
public static void Main()
{
//Setup
var blockingCollection = new QueuedBlockingCollection<int>(10);
var tasks = new Task[10];
//Block the collection by filling it up
for (int i=1; i<=10; i++) blockingCollection.Add(99);
//Start 10 threads all trying to add another value
for (int i=1; i<=10; i++)
{
int index = i; //unclose
tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
Task.Delay(100).Wait(); //Wait long enough for the Enqueue call to block
}
//Purge the collection, making room for more values
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
//Wait for our pending adds to complete
Task.WaitAll(tasks);
//Display the collection in the order read
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
}
}
输出:
99
99
99
99
99
99
99
99
99
99
1
2
3
4
5
6
7
8
9
10
看起来有效!但为了确定起见,我将Enqueue 改回Add,以确保该解决方案确实有所作为。果然,它以常规的Add 结束。
99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10
查看DotNetFiddle上的代码