【发布时间】:2011-06-25 22:44:02
【问题描述】:
我正在阅读的一本书是“多处理器编程的艺术,作者 Maurice Herlihy 和 Nir Shavit”。在其中,有一个“无等待”队列(经过一些语言适应后)在线程环境中完美地测试和逻辑功能 - 至少,即使分布在 5 个线程中的 10,000,000 个项目也没有冲突,并且逻辑检查。
(我编辑了队列以在无法获取项目时返回false,而不是抛出异常。代码如下)。
但是,它有一个警告;队列不能增长。粗略的逻辑检查表明,如果不锁定队列,它就无法增长 - 这在某种程度上否定了拥有无锁队列的意义。
因此,目的是创建一个可以增长的无锁(或至少无饥饿锁)队列。
所以:如果我们本质上给每个线程自己的共享队列,以一种不矛盾的方式(并且接受这个问题已经解决的可能性很高,并且为了边做边学更好):
WaitFreeQueue<Queue<int>> queues = new WaitFreeQueue<Queue<int>>(threadCount);
// Dequeue a queue, enqueue an item, enqueue the queue.
// Dequeue a queue, dequeue an item, enqueue the queue.
还有免等待队列(以前的代码包含在 cmets 中,以防我做出任何重大更改):
/// <summary>
/// A wait-free queue for use in threaded environments.
/// Significantly adapted from "The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit".
/// </summary>
/// <typeparam name="T">The type of item in the queue.</typeparam>
public class WaitFreeQueue<T>
{
/// <summary>
/// The index to dequeue from.
/// </summary>
protected int head;
/// <summary>
/// The index to queue to.
/// </summary>
protected int tail;
/// <summary>
/// The array to queue in.
/// </summary>
protected T[] items;
/// <summary>
/// The number of items queued.
/// </summary>
public int Count
{
get { return tail - head; }
}
/// <summary>
/// Creates a new wait-free queue.
/// </summary>
/// <param name="capacity">The capacity of the queue.</param>
public WaitFreeQueue(int capacity)
{
items = new T[capacity];
head = 0; tail = 0;
}
/// <summary>
/// Attempts to enqueue an item.
/// </summary>
/// <param name="value">The item to enqueue.</param>
/// <returns>Returns false if there was no room in the queue.</returns>
public bool Enqueue(T value)
{
if (tail - head == items.Length)
// throw new IndexOutOfRangeException();
return false;
items[tail % items.Length] = value;
System.Threading.Interlocked.Increment(ref tail);
return true;
// tail++;
}
/// <summary>
/// Attempts to dequeue an item.
/// </summary>
/// <param name="r">The variable to dequeue to.</param>
/// <returns>Returns true if there was an available item to dequeue.</returns>
public bool Dequeue(out T r)
{
if (tail - head == 0)
// throw new InvalidOperationException("No more items.");
{ r = default(T); return false; }
r = items[head % items.Length];
System.Threading.Interlocked.Increment(ref head);
// head++;
// return r;
return true;
}
}
那么:这行得通吗?如果不是,为什么?如果是这样,是否还有任何可预见的问题?
谢谢。
【问题讨论】:
-
你为什么不直接使用
ConcurrentQueue<T>?或者这是一个学习练习? -
如果两个线程同时调用
Enqueue,并且在第一个线程执行items[tail % items.Length] = value;后控制从第一个线程切换到第二个线程,您将如何处理? -
我的理解是,它不会在同一时间读取和写入同一位置,因此不会发生冲突。老实说,我可能需要再次重新阅读逻辑部分,但测试和我的理解都表明它有效。当然,我对如何做有点模糊,但我正在学习。
标签: c# multithreading queue