【问题标题】:.NET producer-consumer questions.NET 生产者-消费者问题
【发布时间】:2011-07-27 01:45:56
【问题描述】:

我正在为 Web 服务编写一个相对简单的“代理”应用程序。总体思路是 TCP 服务器(带异步连接)将从客户端读取(字符串)数据并将该数据(作为读取回调函数的一部分)放入两个队列(Q1 和 Q2)之一。另一个线程将读取这些队列中的数据并将其传递给 Web 服务。第一季度的数据应该优先于第二季度的任何数据。

我一直在阅读有关生产者/消费者模式的信息,这似乎就是我试图在队列方面实现的内容。由于我的入队和出队操作将发生在不同的线程上,很明显我的队列需要是线程安全的并支持某种锁定机制?这是一个 .NET 4.0 应用程序,我看到了有关新 BlockingCollection 和 ConcurrentQueue 类的文档,但我不确定到底有什么区别,或者我将如何在这种情况下实现它们。任何人都可以对此有所了解吗?谢谢!

【问题讨论】:

    标签: .net queue producer-consumer


    【解决方案1】:

    我会像下面的课程那样做。当您生成一个项目以将其添加到其中一个队列时,您调用Enqueue()。此方法总是(几乎)立即返回。在另一个线程中,当您准备好使用某个项目时,您调用Dequeue()。它尝试首先从高优先级队列中获取。如果此时任何队列中都没有可用的项目,则调用阻塞。完成制作后,请致电Complete()。在进行该调用并且两个队列都为空之后,对Dequeue() 的下一个调用(或当前阻塞的调用)将抛出InvalidOperationException

    如果您的生产者可以在很长一段时间内比您的消费者更快,您应该绑定队列 (new BlockingCollection<T>(capacity))。但是在这种情况下,如果您只有一个线程同时产生低优先级和高优先级项目,则高优先级项目可能不得不等待低优先级项目。您可以通过让一个线程用于生成高优先级项目和一个用于低优先级项目的线程来解决此问题。或者你可以只绑定高优先级队列并希望你不会一次得到一百万个低优先级项目。

    class Worker<T>
    {
        BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
        BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();
    
        public void Enqueue(T item, bool highPriority)
        {
            BlockingCollection<T> queue;
            if (highPriority)
                queue = m_highPriorityQueue;
            else
                queue = m_lowPriorityQueue;
    
            queue.Add(item);
        }
    
        public T Dequeue()
        {
            T result;
    
            if (!m_highPriorityQueue.IsCompleted)
            {
                if (m_highPriorityQueue.TryTake(out result))
                    return result;
            }
    
            if (!m_lowPriorityQueue.IsCompleted)
            {
                if (m_lowPriorityQueue.TryTake(out result))
                    return result;
            }
    
            if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
                throw new InvalidOperationException("All work is done.");
            else
            {
                try
                {
                    BlockingCollection<T>.TakeFromAny(
                        new[] { m_highPriorityQueue, m_lowPriorityQueue },
                        out result);
                }
                catch (ArgumentException ex)
                {
                    throw new InvalidOperationException("All work is done.", ex);
                }
    
                return result;
            }
        }
    
        public void Complete()
        {
            m_highPriorityQueue.CompleteAdding();
            m_lowPriorityQueue.CompleteAdding();
        }
    }
    

    【讨论】:

    • 非常感谢!非常感谢您提供的信息和建议!
    【解决方案2】:

    BlockingCollection 默认使用 ConcurrentQueue。应该非常适合您的应用程序。如果您将 F# 与邮箱和异步块一起使用,可能会更容易。我之前发布了一个常见实现的示例帖子。

    Map/reduce with F# Agent or MailboxProcessor

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-08-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多