【问题标题】:Thread-safe blocking queue implementation on .NET.NET 上的线程安全阻塞队列实现
【发布时间】:2010-10-22 12:56:41
【问题描述】:

我正在寻找 .NET 的线程安全阻塞队列的实现。 “线程安全阻塞队列”是指: - 线程安全访问队列,其中 Dequeue 方法调用阻塞线程,直到其他线程放入(入队)某个值。

在我找到这个的那一刻: http://www.eggheadcafe.com/articles/20060414.asp (但它适用于 .NET 1.1)。

有人可以评论/批评此实现的正确性。 或者推荐另一个。 提前致谢。

【问题讨论】:

    标签: .net multithreading collections queue


    【解决方案1】:

    队列同步http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx

    无论如何都是一个起点,我从未使用过阻塞队列。对于不太相关的帖子,我们深表歉意。

    【讨论】:

    • 我认为这只是一个线程安全的队列,与他所要求的略有不同。
    • 是的,我发帖后就看到了。编辑以反映
    • Queue.Synchronized() 返回的包装器的 Dequeue 方法是否阻塞当前线程?不。假设您有两个线程,其中一个正在将值放入队列中,另一个正在从该队列中获取值。第二个线程必须将队列集中在一个循环消耗 CPU 中。这很糟糕。
    【解决方案2】:

    这个Creating a blocking Queue in .NET怎么样?

    如果您在 .NET 1.1 中需要它(我从问题中不确定),只需删除泛型并将 T 替换为 object

    【讨论】:

    • 感谢您的链接。奇怪的是,我没有通过搜索“队列”找到该主题。好吧,那个话题和我的话题差不多。顺便说一句:我不需要移植到 .net 1.1。相关主题的解决方案与eggheadcafe.com/articles/20060414.asp上的解决方案非常相似
    • @Shrike,一点也不奇怪。这只是 StackOverflow 搜索有多糟糕的另一个例子。太糟糕了,每个人都会告诉你只使用谷歌(和“站点:”命令)。
    • @Ash:这可能是真的......这当然是我搜索stackoverflow的方式;-p
    【解决方案3】:

    微软有一个很好的例子:

    //Copyright (C) Microsoft Corporation.  All rights reserved.
    
    using System;
    using System.Threading;
    using System.Collections;
    using System.Collections.Generic;
    
    // The thread synchronization events are encapsulated in this 
    // class to allow them to easily be passed to the Consumer and 
    // Producer classes. 
    public class SyncEvents
    {
        public SyncEvents()
        {
            // AutoResetEvent is used for the "new item" event because
            // we want this event to reset automatically each time the
            // consumer thread responds to this event.
            _newItemEvent = new AutoResetEvent(false);
    
            // ManualResetEvent is used for the "exit" event because
            // we want multiple threads to respond when this event is
            // signaled. If we used AutoResetEvent instead, the event
            // object would revert to a non-signaled state with after 
            // a single thread responded, and the other thread would 
            // fail to terminate.
            _exitThreadEvent = new ManualResetEvent(false);
    
            // The two events are placed in a WaitHandle array as well so
            // that the consumer thread can block on both events using
            // the WaitAny method.
            _eventArray = new WaitHandle[2];
            _eventArray[0] = _newItemEvent;
            _eventArray[1] = _exitThreadEvent;
        }
    
        // Public properties allow safe access to the events.
        public EventWaitHandle ExitThreadEvent
        {
            get { return _exitThreadEvent; }
        }
        public EventWaitHandle NewItemEvent
        {
            get { return _newItemEvent; }
        }
        public WaitHandle[] EventArray
        {
            get { return _eventArray; }
        }
    
        private EventWaitHandle _newItemEvent;
        private EventWaitHandle _exitThreadEvent;
        private WaitHandle[] _eventArray;
    }
    
    // The Producer class asynchronously (using a worker thread)
    // adds items to the queue until there are 20 items.
    public class Producer 
    {
        public Producer(Queue<int> q, SyncEvents e)
        {
            _queue = q;
            _syncEvents = e;
        }
        public void ThreadRun()
        {
            int count = 0;
            Random r = new Random();
            while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
            {
                lock (((ICollection)_queue).SyncRoot)
                {
                    while (_queue.Count < 20)
                    {
                        _queue.Enqueue(r.Next(0, 100));
                        _syncEvents.NewItemEvent.Set();
                        count++;
                    }
                }
            }
            Console.WriteLine("Producer thread: produced {0} items", count);
        }
        private Queue<int> _queue;
        private SyncEvents _syncEvents;
    }
    
    // The Consumer class uses its own worker thread to consume items
    // in the queue. The Producer class notifies the Consumer class
    // of new items with the NewItemEvent.
    public class Consumer
    {
        public Consumer(Queue<int> q, SyncEvents e)
        {
            _queue = q;
            _syncEvents = e;
        }
        public void ThreadRun()
        {
            int count = 0;
            while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
            {
                lock (((ICollection)_queue).SyncRoot)
                {
                    int item = _queue.Dequeue();
                }
                count++;
            }
            Console.WriteLine("Consumer Thread: consumed {0} items", count);
        }
        private Queue<int> _queue;
        private SyncEvents _syncEvents;
    }
    
    public class ThreadSyncSample
    {
        private static void ShowQueueContents(Queue<int> q)
        {
            // Enumerating a collection is inherently not thread-safe,
            // so it is imperative that the collection be locked throughout
            // the enumeration to prevent the consumer and producer threads
            // from modifying the contents. (This method is called by the
            // primary thread only.)
            lock (((ICollection)q).SyncRoot)
            {
                foreach (int i in q)
                {
                    Console.Write("{0} ", i);
                }
            }
            Console.WriteLine();
        }
    
        static void Main()
        {
            // Configure struct containing event information required
            // for thread synchronization. 
            SyncEvents syncEvents = new SyncEvents();
    
            // Generic Queue collection is used to store items to be 
            // produced and consumed. In this case 'int' is used.
            Queue<int> queue = new Queue<int>();
    
            // Create objects, one to produce items, and one to 
            // consume. The queue and the thread synchronization
            // events are passed to both objects.
            Console.WriteLine("Configuring worker threads...");
            Producer producer = new Producer(queue, syncEvents);
            Consumer consumer = new Consumer(queue, syncEvents);
    
            // Create the thread objects for producer and consumer
            // objects. This step does not create or launch the
            // actual threads.
            Thread producerThread = new Thread(producer.ThreadRun);
            Thread consumerThread = new Thread(consumer.ThreadRun);
    
            // Create and launch both threads.     
            Console.WriteLine("Launching producer and consumer threads...");        
            producerThread.Start();
            consumerThread.Start();
    
            // Let producer and consumer threads run for 10 seconds.
            // Use the primary thread (the thread executing this method)
            // to display the queue contents every 2.5 seconds.
            for (int i = 0; i < 4; i++)
            {
                Thread.Sleep(2500);
                ShowQueueContents(queue);
            }
    
            // Signal both consumer and producer thread to terminate.
            // Both threads will respond because ExitThreadEvent is a 
            // manual-reset event--so it stays 'set' unless explicitly reset.
            Console.WriteLine("Signaling threads to terminate...");
            syncEvents.ExitThreadEvent.Set();
    
            // Use Join to block primary thread, first until the producer thread
            // terminates, then until the consumer thread terminates.
            Console.WriteLine("main thread waiting for threads to finish...");
            producerThread.Join();
            consumerThread.Join();
        }
    }
    

    【讨论】:

      【解决方案4】:

      请记住,如果您可以完全控制调用代码,则锁定调用代码可能是更好的选择。考虑循环访问您的队列:您将不必要地多次获取锁,可能会导致性能损失。

      【讨论】:

        【解决方案5】:

        微软的例子是一个很好的例子,但它没有被封装到一个类中。此外,它要求使用者线程在 MTA 中运行(因为 WaitAny 调用)。在某些情况下,您可能需要在 STA 中运行(例如,如果您正在执行 COM 互操作)。在这些情况下,不能使用 WaitAny。

        我有一个简单的阻塞队列类来解决这个问题: http://element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html

        【讨论】:

          【解决方案6】:

          作为参考,.NET 4 引入了System.Collections.Concurrent.BlockingCollection&lt;T&gt; 类型来解决这个问题。对于非阻塞队列,您可以使用System.Collections.Concurrent.ConcurrentQueue&lt;T&gt;。请注意,ConcurrentQueue&lt;T&gt; 可能会用作BlockingCollection&lt;T&gt; 的底层数据存储,以供 OP 使用。

          【讨论】:

          • 您能否提供一个添加几个方法的代码示例,以便我们看到它的实际效果?
          【解决方案7】:

          是的,.NET4 包含并发集合。顺便说一句,来自 pfx 团队的关于并行扩展的非常好的手册 - http://www.microsoft.com/downloads/details.aspx?FamilyID=86b3d32b-ad26-4bb8-a3ae-c1637026c3ee&displaylang=en

          pfx 作为 Reactive Extensions 的一部分也可用于 .net 3.5。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2011-05-12
            • 2017-09-13
            • 2012-05-30
            • 2019-02-12
            • 2019-11-01
            • 1970-01-01
            相关资源
            最近更新 更多