【发布时间】:2017-03-27 14:08:48
【问题描述】:
受信号量小书的启发,我决定使用信号量来实现生产者-消费者问题。
我特别希望能够随意停止所有 Worker 线程。 我已经对我的方法进行了广泛的测试,没有发现任何错误。
以下代码是用于测试的原型,可以作为控制台应用程序运行:
using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;
public class ProducerConsumer
{
private static readonly int _numThreads = 5;
private static readonly int _numItemsEnqueued = 10;
private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
private static ConcurrentQueue<int> _queue;
public static void Main()
{
_queue = new ConcurrentQueue<int>();
// Create and start threads.
for (int i = 1; i <= _numThreads; i++)
{
Thread t = new Thread(new ParameterizedThreadStart(Worker));
// Start the thread, passing the number.
t.Start(i);
}
// Wait for half a second, to allow all the
// threads to start and to block on the semaphore.
Thread.Sleep(500);
Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
for (int i = 1; i <= _numItemsEnqueued; i++)
{
Console.WriteLine("Waking up a worker thread.");
_queue.Enqueue(i);
_workItems.Release(); //wake up 1 worker
Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
}
// sleep for 5 seconds to allow threads to exit
Thread.Sleep(5000);
Assert.True(_queue.Count == 0);
Console.WriteLine("Main thread stops all threads.");
_stop.Set();
// wait a while to exit
Thread.Sleep(5000);
Console.WriteLine("Main thread exits.");
Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
Assert.True(_queue.Count == 0);
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}
private static void Worker(object num)
{
// Each worker thread begins by requesting the semaphore.
Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
WaitHandle[] wait = { _workItems, _stop };
int signal;
while (0 == (signal = WaitHandle.WaitAny(wait)))
{
Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
int res;
if (_queue.TryDequeue(out res))
{
Console.WriteLine("Thread {0} dequeues {1}.", num, res);
}
else
{
throw new Exception("this should not happen.");
}
}
if (signal == 1)
Console.WriteLine("Thread {0} was stopped.", num);
Console.WriteLine("Thread {0} exits.", num);
}
}
现在我的问题是,我使用WaitHandle.WaitAny(semaphore) 的假设是当我在信号量上调用Release() 时,只会唤醒1 个工人。但是,我无法在文档中找到确实如此的保证。谁能证实这是真的?
【问题讨论】:
-
是的,这就是重点。
-
你应该使用
SemaforSlim,它更快。您也可以为此任务调查TPL Dataflow库,您可以调整具有MaxDegreeOfParallelism属性的工人数量,或者处理两个块之间的链接以完全停止管道 -
谢谢@VMAtm,会调查的。
标签: c# multithreading concurrency producer-consumer