【问题标题】:Can a C# blocking FIFO queue leak messages?C# 阻塞 FIFO 队列会泄漏消息吗?
【发布时间】:2010-10-09 22:46:45
【问题描述】:

我正在从事一个学术开源项目,现在我需要在 C# 中创建一个快速阻塞 FIFO 队列。我的第一个实现只是在读取器的信号量中包装了一个同步队列(带动态扩展),然后我决定以以下(理论上更快)的方式重新实现

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}

这是我们在教科书中找到的带有静态数组的经典 FIFO 队列实现。它旨在以原子方式递增指针,并且由于我无法使指针在达到(容量-1)时回到零,因此我计算模数。理论上,使用 Interlocked 与在进行增量之前锁定相同,并且由于存在信号量,因此可以有多个生产者/消费者进入队列,但一次只有一个能够修改队列指针。 首先,因为 Interlocked.Increment 先递增,然后返回,我已经明白我仅限于使用后递增值并从数组中的位置 1 开始存储项目。没问题,到了一定值我就回0了

它有什么问题? 您不会相信,在重负载下运行时,有时队列会返回 NULL 值。我确定,重复一遍,我确定,没有方法将 null 排入队列。这绝对是正确的,因为我尝试在 Enqueue 中进行空检查以确保没有抛出任何错误。我用 Visual Studio 为此创建了一个测试用例(顺便说一句,我使用像 maaaaaaaany 人这样的双核 CPU)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }

一旦消费者线程得到一个空值,就会计算一个错误。 当使用 1 个生产者和 1 个消费者执行测试时,它成功了。当使用 2 个生产者和 2 个消费者或更多进行测试时,会发生灾难:甚至检测到 2000 次泄漏。我发现问题可能出在 Enqueue 方法中。根据设计合同,生产者只能写入空单元格 (null),但是通过一些诊断修改我的代码,我发现有时生产者试图在非空单元格上写入,然后被“好”数据占用。

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }

“太糟糕了”的异常经常发生。但是并发写入引发冲突太奇怪了,因为增量是原子的,并且写入器的信号量只允许与空闲数组单元一样多的写入器。

有人可以帮我吗?如果您能与我分享您的技能和经验,我将不胜感激。

谢谢。

【问题讨论】:

  • 有什么理由不使用BlockingCollection<T>
  • 也许 Interlocked.Exchange 在阵列上的行为很有趣并打破了波动性?
  • @dtb:我从来不知道那门课。我的软件主要在 Mono 上工作(我没有提到,因为我不认为它很重要),据我所知 Mono 对 .NET 4 的支持不稳定(但我可能是错的)所以这就是为什么我没有导入 4.0 dll,我没有看到类。无论如何,我会看看它,谢谢 - @pst:这可能是 Windows 上的 .NET 错误吗?我还没有在 Mono 中测试过
  • 锁定整个方法并使用 volatile 共享变量时问题是否仍然存在?

标签: c# concurrency queue fifo interlocked-increment


【解决方案1】:

我必须说,这是一个非常聪明的想法,我想了一段时间,然后才开始意识到哪里(我认为) 错误就在这里。所以,一方面,想出这样一个聪明的设计是值得称赞的!但是,与此同时,为你展示“Kernighan's Law”感到羞耻

首先,调试的难度是编写代码的两倍。因此,如果您尽可能巧妙地编写代码,那么根据定义,您还不够聪明,无法对其进行调试。

问题基本上是这样的:您假设WaitOneRelease 有效地序列化您所有的EnqueueDequeue 操作;但这并不是这里发生的事情。请记住,Semaphore 类用于限制访问资源的线程数不是以确保特定的事件顺序。 每个WaitOneRelease 之间发生的事情不能保证以与WaitOneRelease 调用自身相同的“线程顺序”发生。

这很难用文字来解释,所以让我试着提供一个视觉说明。

假设您的队列容量为 8,看起来像这样(让 0 代表 nullx 代表一个对象):

[ x x x x x x x x ]

所以Enqueue 已被调用 8 次,队列已满。因此,您的_writeSema 信号量将阻塞WaitOne,而您的_readSema 信号量将立即返回WaitOne

现在让我们假设Dequeue 在 3 个不同的线程上或多或少地同时被调用。我们称它们为 T1、T2 和 T3。

在继续之前,让我为您的 Dequeue 实现应用一些标签,以供参考:

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}

好的,所以 T1、T2 和 T3 都经过了 A 点。然后为简单起见,假设它们每个都“按顺序”到达行 B,因此 T1 的 index 为 0,T2 的 index 为 1,T3 的 index 为2.

到目前为止一切顺利。但这里有个问题:不能保证从这里开始,T1、T2 和 T3 会以任何指定的顺序到达 D。假设 T3 实际上比 T1 和 T2领先,越过 C 行(因此将 _array[2] 设置为 null)并一直移动到 D 行

在此之后,_writeSema 将发出信号,这意味着您的队列中有一个可用的插槽可以写入,对吗? 但您的队列现在看起来像这样!

[ x x 0 x x x x x ]

因此,如果同时出现另一个线程并调用Enqueue,它实际上将过去 _writeSema.WaitOne,增加_head,并获得index 0 , 即使插槽 0 不是空的。这样做的结果是,槽 0 中的项目实际上可能在 T1(还记得他吗?)读取它之前被覆盖

要了解您的null 值来自哪里,您只需想象我刚才描述的过程的相反过程。也就是说,假设您的队列如下所示:

[ 0 0 0 0 0 0 0 0 ]

三个线程 T1、T2 和 T3 几乎同时调用Enqueue。 T3 递增_head last,但插入其项目(在_array[2])并调用_readSema.Release first,导致_readSema 发出信号,但队列看起来像:

[ 0 0 x 0 0 0 0 0 ]

因此,如果同时有另一个线程调用Dequeue(在 T1 和 T2 完成它们的工作之前),它将通过 _readSema.WaitOne,增加 _tail,并得到一个 @987654361 @ of 0,即使插槽 0 为空

所以这是你的问题。至于解决方案,我目前没有任何建议。给我一些时间考虑一下......(我现在发布这个答案是因为它在我的脑海中很新鲜,我觉得它可能会对你有所帮助。)

【讨论】:

  • 因此,使用这种设计,当任何写入者处于“入队”状态时,不允许任何读取器继续,而当任何读取器处于“出队”时,任何写入器都不能被允许继续。
  • 灭你丹!!!感谢你让我意识到我的代码中可怕的设计缺陷!现在我正在考虑它......重要的是必须按照线程增加指针的顺序释放信号量,但写入顺序没有约束。但是,我必须记住,T[](当 T 是一个类时)为每个单元格保存一个 reference,即。一个指针。在所有平台上,复制指针都像原子操作一样快。所以我相信锁定整个方法一次而不是使用复杂的互斥模式会更好。如果你有更多的想法,我来了 ;)
【解决方案2】:

(+1给我投票的丹涛有答案) enqueue 会改成这样的……

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;

dequeue 会改成这样...

while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;

这建立在 Dan Tao 的出色分析之上。因为索引是原子获得的,所以(假设没有线程在 enqueue 或 dequeue 方法中死亡或终止)读者可以保证最终填充他的单元格,或者保证写入者最终可以释放他的单元格(null)。

【讨论】:

    【解决方案3】:

    谢谢 Dan Tao 和 Les,

    非常感谢您的帮助。 Dan,你打开了我的思路:临界区中有多少生产者/消费者并不重要,重要的是 锁按顺序释放。莱斯,你找到了问题的解决方案。

    现在是时候用我在你们俩的帮助下编写的最终代码来最终回答我自己的问题了。嗯,虽然不多,但是对 Les 的代码做了一点增强

    入队:

    while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
                Thread.Sleep(0);
    

    出队:

    while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
                Thread.Sleep(0);
    

    为什么是 Thread.Sleep(0)?当我们发现无法检索/存储元素时,为什么要立即再次检查?我需要强制上下文切换以允许其他线程读/写。显然,下一个将被调度的线程可能是另一个无法操作的线程,但至少我们是强制的。来源:http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html

    我还测试了前一个测试用例的代码来证明我的主张:

    不睡觉(0)

    Read 6164150 elements
    Wrote 6322541 elements
    Read 5885192 elements
    Wrote 5785144 elements
    Wrote 6439924 elements
    Read 6497471 elements
    

    带睡眠(0)

    Wrote 7135907 elements
    Read 6361996 elements
    Wrote 6761158 elements
    Read 6203202 elements
    Wrote 5257581 elements
    Read 6587568 elements
    

    我知道这不是一个“伟大”的发现,我不会因为这些数字而获得图灵奖。性能增量并不显着,但大于零。强制上下文切换允许执行更多的 RW 操作(=更高的吞吐量)。

    要明确:在我的测试中,我只是评估队列的性能,而不是模拟生产者/消费者问题,所以不要关心是否在一分钟后测试结束队列中仍有元素。但我只是证明了我的方法是有效的,谢谢大家。

    可作为 MS-RL 开源的代码:http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs?revision=461&view=markup

    【讨论】:

    • 我可能错了,但我认为 Interlocked 例程知道它们是在单处理器环境还是多处理器环境中运行。如果在单个 proc env 中,那么它们应该推迟以避免占用 CPU(唯一的 CPU)。
    • 一年多后重新阅读我的答案让我相信我以错误的方式写了正确的东西
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-16
    • 2017-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多