【问题标题】:C# producer/consumerC# 生产者/消费者
【发布时间】:2009-11-01 04:36:40
【问题描述】:

我最近遇到了一个生产者/消费者模式 c# 实现。它非常简单并且(至少对我而言)非常优雅。

它似乎是在 2006 年左右设计的,所以我想知道这个实现是否是
- 安全
- 仍然适用

代码如下(原代码引用于http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

【问题讨论】:

  • 您的目标是哪个版本的 .NET?
  • 您好,lboregard,有一个使用 BlockingCollectionthis link 中实现简单的生产者-消费者模式的工作示例。

标签: c# design-patterns monitor producer-consumer


【解决方案1】:

代码比那更旧 - 我在 .NET 2.0 出现之前写了一段时间。生产者/消费者队列的概念比那个方式老了:)

是的,据我所知,该代码是安全的 - 但它有一些缺陷:

  • 它是非通用的。现代版本肯定是通用的。
  • 它无法停止队列。停止队列(以便所有消费者线程退出)的一种简单方法是拥有一个可以放入队列的“停止工作”令牌。然后,您可以添加与线程一样多的令牌。或者,您有一个单独的标志来指示您要停止。 (这允许其他线程在完成队列中的所有当前工作之前停止。)
  • 如果作业非常小,一次处理一个作业可能不是最有效的做法。

说实话,代码背后的想法比代码本身更重要。

【讨论】:

  • 脉冲不一定会唤醒消费者,我知道这不太可能,但理论上只有生产者才能无限运行。我还测量了监视器等待/脉冲与事件等待句柄相比没有性能优势
  • @TakeMeAsAGuest:不知道你对第一部分的意思是什么——你是说你已经看到线程一直在监视器上等待但脉冲什么也没做的情况?至于性能——有很多很多不同的场景需要考虑(硬件、软件、等待线程的数量等)。我看看能不能在 Joe Duffy 的书中找到一些参考资料……
【解决方案2】:

您可以执行以下代码 sn-p 之类的操作。它是通用的,并且有一种方法可以将空值(或您想使用的任何标志)排入队列以告诉工作线程退出。

代码取自这里:http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

【讨论】:

  • 这是迄今为止最好的生产者消费者模式的实现。我最近在我的多线程应用程序中使用了它,即使在 1000-1500 个线程下它也能正常工作。
  • 我确定我在这里遗漏了一些东西(我的 C# 技能非常生疏了)但是这个例子没有调用消费 task 上的方法(与引用的non-generic code 不同,它在消费者中存储和调用Action 委托。)那么,如果它不演示对消费任务的方法调用,那么使这个泛型有什么意义呢? (试图围绕这个示例与引用的实现进行比较。)
  • @Inactivist 您可以将 Action 添加到 ctor,另存为字段 _action,然后在执行 Dequeue() 后立即调用它,如下所示:_action(task);
  • 是的,正如@Inactivist 所说,只需在构造函数中传递一个 Action ,并将出队的对象传递给它(在锁之外)。对不起,我错过了这个。顺便说一句,现在这有点多余,因为阻塞集合/并发队列和并行库 - 甚至 RX 更容易!
  • 你能用 Marcus 提到的正确代码更新你的答案吗?
【解决方案3】:

那天我从上面的代码和article series 中了解了 Monitor.Wait/Pulse 的工作原理(以及很多关于线程的一般知识)。所以正如 Jon 所说,它具有很大的价值,并且确实是安全和适用的。

但是,从 .NET 4 开始,有一个 producer-consumer queue implementation in the framework。我自己才发现它,但到目前为止,它可以满足我的一切需求。

【讨论】:

    【解决方案4】:

    警告:如果您阅读 cmets,您就会明白我的回答是错误的 :)

    您的代码中可能存在死锁

    想象以下情况,为了清楚起见,我使用了单线程方法,但应该很容易转换为带睡眠的多线程:

    // We create some actions...
    object locker = new object();
    
    Action action1 = () => {
        lock (locker)
        {
            System.Threading.Monitor.Wait(locker);
            Console.WriteLine("This is action1");
        }
    };
    
    Action action2 = () => {
        lock (locker)
        {
            System.Threading.Monitor.Wait(locker);
            Console.WriteLine("This is action2");
        }
    };
    
    // ... (stuff happens, etc.)
    
    // Imagine both actions were running
    // and there's 0 items in the queue
    
    // And now the producer kicks in...
    lock (locker)
    {
        // This would add a job to the queue
    
        Console.WriteLine("Pulse now!");
        System.Threading.Monitor.Pulse(locker);
    }
    
    // ... (more stuff)
    // and the actions finish now!
    
    Console.WriteLine("Consume action!");
    action1(); // Oops... they're locked...
    action2();
    

    如果这没有任何意义,请告诉我。

    如果这一点得到确认,那么您的问题的答案是“不,这不安全”;) 我希望这会有所帮助。

    【讨论】:

    • 我没有看到原始发布者的代码有任何死锁,因为有时可以将项目添加到队列中,当每个消费者都在锁之外(在这种情况下,消费者会在下一次它获取锁,观察队列不为空,因此不必等待)或等待脉冲(在这种情况下,脉冲保证唤醒消费者)。
    • 我必须承认我几乎忘记了我在想什么的细节。回想起来,我相信问题出在 while 循环上。它将保持消费者线程锁定,直到队列中有东西,这会阻止生产者锁定和排队。这有意义吗?
    • Monitor.Wait() 方法释放它正在等待的锁,直到某个其他线程发出脉冲为止。消费者线程将在 Consume 方法中被阻塞,但这不会阻止生产者向它提供东西。最大的危险是,如果生产者在生成消费者期望的所有数据之前退出,消费者将永远等待永远不会到达的东西。这可以通过例如处理有一个AllDone 标志。
    • 如果最后一个生产者设置 AllDone 然后脉冲监视器,如果 Consume 方法检查 AllDone 作为其 while 循环条件的一部分,并且在看到 AllDone 脉冲监视器然后退出(通过不返回任何内容或抛出异常),然后即使有多个消费者(它将以任意顺序处理队列项),所有等待的消费者也会被唤醒并被告知退出。
    • 谢谢! :) 现在有意义
    【解决方案5】:
    public class ProducerConsumerProblem
        {
            private int n;
            object obj = new object();
            public ProducerConsumerProblem(int n)
            {
                this.n = n;
            }
    
            public void Producer()
            {
    
                for (int i = 0; i < n; i++)
                {
                    lock (obj)
                    {
                        Console.Write("Producer =>");
                        System.Threading.Monitor.Pulse(obj);
                        System.Threading.Thread.Sleep(1);
                        System.Threading.Monitor.Wait(obj);
                    }
                }
            }
    
            public void Consumer()
            {
                lock (obj)
                {
                    for (int i = 0; i < n; i++)
                    {
                        System.Threading.Monitor.Wait(obj, 10);
                        Console.Write("<= Consumer");
                        System.Threading.Monitor.Pulse(obj);
                        Console.WriteLine();
                    }
                }
            }
        }
    
        public class Program
        {
            static void Main(string[] args)
            {
                ProducerConsumerProblem f = new ProducerConsumerProblem(10);
                System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
                System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
                t1.IsBackground = true;
                t2.IsBackground = true;
                t1.Start();
                t2.Start();
                Console.ReadLine();
            }
        }
    

    输出

    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    Producer =><= Consumer
    

    【讨论】:

      猜你喜欢
      • 2012-03-12
      • 2018-09-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-14
      • 1970-01-01
      相关资源
      最近更新 更多