【问题标题】:producer-consumer with a resource拥有资源的生产者-消费者
【发布时间】:2011-05-15 05:36:36
【问题描述】:

我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要StreamWriter 来写入其结果。每个任务还​​必须有参数传递给它。

我从 Joseph Albahari 的实现开始(见下文我的修改版本)。

我将Action 的队列替换为Action<T> 的队列,其中T 是资源,并将与线程关联的资源传递给Action。但是,这给我留下了如何将参数传递给Action 的问题。显然,Action 必须替换为委托,但这留下了在任务入队时如何传递参数的问题(来自ProducerConsumerQueue 类之外)。关于如何做到这一点的任何想法?

class ProducerConsumerQueue<T>
    {
        readonly object _locker = new object();            
        Thread[] _workers;
        Queue<Action<T>> _itemQ = new Queue<Action<T>>();

        public ProducerConsumerQueue(T[] resources)
        {
            _workers = new Thread[resources.Length];

            // Create and start a separate thread for each worker
            for (int i = 0; i < resources.Length; i++)
            {
                Thread thread = new Thread(() => Consume(resources[i]));
                thread.SetApartmentState(ApartmentState.STA);
                _workers[i] = thread;
                _workers[i].Start();
            }
        }        

        public void Shutdown(bool waitForWorkers)
        {
            // Enqueue one null item per worker to make each exit.
            foreach (Thread worker in _workers)
                EnqueueItem(null);

            // Wait for workers to finish
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action<T> item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);           // We must pulse because we're
                Monitor.Pulse(_locker);         // changing a blocking condition.
            }
        }

        void Consume(T parameter)
        {
            while (true)                        // Keep consuming until
            {                                   // told otherwise.
                Action<T> item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0) Monitor.Wait(_locker);
                    item = _itemQ.Dequeue();
                }
                if (item == null) return;         // This signals our exit.
                item(parameter);                           // Execute item.
            }
        }
    }

【问题讨论】:

    标签: c# multithreading producer-consumer


    【解决方案1】:

    ProducerConsumerQueue&lt;T&gt; 中的类型 T 不一定是您的资源,它可以是包含您的资源的复合类型。使用 .NET4 最简单的方法是使用 Tuple&lt;StreamWriter, YourParameterType&gt;。生产/消费者队列只是吃掉并吐出T,因此在您的Action&lt;T&gt; 中,您可以只使用属性来获取资源和参数。如果您使用Tuple,您将使用Item1 获取资源并使用Item2 获取参数。

    如果您不使用 .NET4,过程类似,但您只需创建自己的类:

    public class WorkItem<T>
    {
        private StreamWriter resource;
        private T parameter;
    
        public WorkItem(StreamWriter resource, T parameter)
        {
            this.resource = resource;
            this.parameter = parameter;
        }
    
        public StreamWriter Resource { get { return resource; } }
        public T Parameter { get { return parameter; } }
    }
    

    事实上,将其通用化可能会针对您的情况进行过度设计。您可以将 T 定义为您想要的类型。

    此外,作为参考,.NET4 中包含了执行多线程的新方法,这些方法可能适用于您的用例,例如并发队列和并行任务库。它们还可以与信号量等传统方法结合使用。

    编辑:

    继续使用这种方法,下面是一个小示例类,演示如何使用:

    • 用于控制对有限资源的访问的信号量
    • 一个并发队列,用于在线程之间安全地管理该资源
    • 使用任务并行库进行任务管理

    这是Processor 类:

    public class Processor
    {
        private const int count = 3;
        private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
        private Semaphore semaphore = new Semaphore(count, count);
    
        public Processor()
        {
            // Populate the resource queue.
            for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
        }
    
        public void Process(int parameter)
        {
            // Wait for one of our resources to become free.
            semaphore.WaitOne();
            StreamWriter resource;
            queue.TryDequeue(out resource);
    
            // Dispatch the work to a task.
            Task.Factory.StartNew(() => Process(resource, parameter));
        }
    
        private Random random = new Random();
    
        private void Process(StreamWriter resource, int parameter)
        {
            // Do work in background with resource.
            Thread.Sleep(random.Next(10) * 100);
            resource.WriteLine("Parameter = {0}", parameter);
            queue.Enqueue(resource);
            semaphore.Release();
        }
    }
    

    现在我们可以像这样使用这个类了:

    var processor = new Processor();
    for (int i = 0; i < 10; i++)
        processor.Process(i);
    

    同时安排的任务不超过三个,每个任务都有自己的StreamWriter资源,可以回收。

    【讨论】:

    • 问题是我不想为每个任务提供自己的 StreamWriter。我希望每个 StreamWriter 都属于一个线程,该线程将在执行任务时重用它。
    猜你喜欢
    • 2011-02-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多