【问题标题】:C# Producer-Consumer using Semaphores使用信号量的 C# 生产者-消费者
【发布时间】:2017-03-27 14:08:48
【问题描述】:

受信号量小书的启发,我决定使用信号量来实现生产者-消费者问题。

我特别希望能够随意停止所有 Worker 线程。 我已经对我的方法进行了广泛的测试,没有发现任何错误。

以下代码是用于测试的原型,可以作为控制台应用程序运行:

using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;

public class ProducerConsumer
{
    private static readonly int _numThreads = 5;
    private static readonly int _numItemsEnqueued = 10;
    private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
    private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
    private static ConcurrentQueue<int> _queue;

    public static void Main()
    {
        _queue = new ConcurrentQueue<int>();

        // Create and start threads.
        for (int i = 1; i <= _numThreads; i++)
        {
            Thread t = new Thread(new ParameterizedThreadStart(Worker));

            // Start the thread, passing the number.
            t.Start(i);
        }

        // Wait for half a second, to allow all the
        // threads to start and to block on the semaphore.
        Thread.Sleep(500);

        Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
        for (int i = 1; i <= _numItemsEnqueued; i++)
        {
            Console.WriteLine("Waking up a worker thread.");
            _queue.Enqueue(i);
            _workItems.Release(); //wake up 1 worker
            Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
        }

        // sleep for 5 seconds to allow threads to exit
        Thread.Sleep(5000);
        Assert.True(_queue.Count == 0);

        Console.WriteLine("Main thread stops all threads.");
        _stop.Set();

        // wait a while to exit
        Thread.Sleep(5000);
        Console.WriteLine("Main thread exits.");
        Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
        Assert.True(_queue.Count == 0);
        Console.WriteLine("Press Enter to exit.");
        Console.ReadLine();
    }

    private static void Worker(object num)
    {
        // Each worker thread begins by requesting the semaphore.
        Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
        WaitHandle[] wait = { _workItems, _stop };
        int signal;
        while (0 == (signal = WaitHandle.WaitAny(wait)))
        {
            Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
            int res;
            if (_queue.TryDequeue(out res))
            {
                Console.WriteLine("Thread {0} dequeues {1}.", num, res);
            }
            else
            {
                throw new Exception("this should not happen.");
            }
        }

        if (signal == 1)
            Console.WriteLine("Thread {0} was stopped.", num);

        Console.WriteLine("Thread {0} exits.", num);
    }
}

现在我的问题是,我使用WaitHandle.WaitAny(semaphore) 的假设是当我在信号量上调用Release() 时,只会唤醒1 个工人。但是,我无法在文档中找到确实如此的保证。谁能证实这是真的?

【问题讨论】:

  • 是的,这就是重点。
  • 你应该使用SemaforSlim,它更快。您也可以为此任务调查TPL Dataflow 库,您可以调整具有MaxDegreeOfParallelism 属性的工人数量,或者处理两个块之间的链接以完全停止管道
  • 谢谢@VMAtm,会调查的。

标签: c# multithreading concurrency producer-consumer


【解决方案1】:

确实很有趣,文档似乎没有明确说明在 WaitOne 的情况下只有 1 个线程会收到信号。当您熟悉了多线程理论时,这一点就变得不言而喻了。

是的,在Semaphore 上调用的WaitOne(以及在包括SemaphoreWaitHandles 列表上调用的WaitAny)由单个线程接收。如果您想从 MSDN 中引用,那么这里是,SemaphoreWaitHandlewhich is: 的子类

封装等待独占访问共享资源的操作系统特定对象。

所以是的,除非明确声明的方法提供独占访问。

例如ManualResetEvent 的方法WaitOne 将为所有等待线程解除阻塞,but documentation is explicit about it

通知一个或多个等待线程有事件发生。

【讨论】:

  • 谢谢,这是我所期待的答案。正是因为ManualResetEvent 的行为,我才想对Semaphore 的行为进行保证,所以你把它包括在内真是太好了。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-10-05
  • 1970-01-01
  • 1970-01-01
  • 2016-06-21
  • 2018-09-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多