【问题标题】:Producer/ Consumer pattern using threads and EventWaitHandle使用线程和 EventWaitHandle 的生产者/消费者模式
【发布时间】:2017-02-13 05:39:32
【问题描述】:

我想这有点像代码审查,但这是我对生产者/消费者模式的实现。我想知道的是,ReceivingThread()SendingThread() 方法中的 while 循环可能会停止执行。请注意,EnqueueSend(DataSendEnqeueInfo info) 是从多个不同的线程调用的,我可能无法在这里使用任务,因为我必须在单独的线程中使用命令。

private Thread mReceivingThread;
private Thread mSendingThread;
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue;
private Queue<DataSendEnqeueInfo> mSendingThreadQueue;
private readonly object mReceivingQueueLock = new object();
private readonly object mSendingQueueLock = new object();
private bool mIsRunning;
EventWaitHandle mRcWaitHandle;
EventWaitHandle mSeWaitHandle;

private void ReceivingThread()
{
    while (mIsRunning)
    {
        mRcWaitHandle.WaitOne();
        DataRecievedEnqeueInfo item = null;
        while (mReceivingThreadQueue.Count > 0)
        {
            lock (mReceivingQueueLock)
            {
                item = mReceivingThreadQueue.Dequeue();
            }
            ProcessReceivingItem(item);
        }
        mRcWaitHandle.Reset();
    }
}

private void SendingThread()
{
    while (mIsRunning)
    {
        mSeWaitHandle.WaitOne();
        while (mSendingThreadQueue.Count > 0)
        {
            DataSendEnqeueInfo item = null;
            lock (mSendingQueueLock)
            {
                item = mSendingThreadQueue.Dequeue();
            }
            ProcessSendingItem(item);
        }
        mSeWaitHandle.Reset();
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    lock (mReceivingQueueLock)
    {
        mReceivingThreadQueue.Enqueue(info);
        mRcWaitHandle.Set();
    }
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
     lock (mSendingQueueLock)
    {
        mSendingThreadQueue.Enqueue(info);
        mSeWaitHandle.Set();
    }
}

P.S 这里的想法是,当队列为空时,我使用WaitHandles 使线程进入睡眠状态,并在新项目入队时发出信号让它们开始。

更新 我将把这个https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/ 留给那些可能试图使用 TPL 或任务来实现生产者/消费者模式的人。

【问题讨论】:

  • 您最好使用BlockingCollection,它会为您处理所有同步逻辑。
  • 屏蔽收藏是一个不错的选择,不过你也可以试试TPL Dataflow。如果需要,我可以提供一些示例逻辑。
  • @VMAtm 我一直想了解 TPL 数据流。如果您能给我一些示例逻辑,那将是您的好意,当然,前提是它不会太麻烦。
  • @SushantPoojary 我已经用一些示例添加了答案
  • 嗨,Sushant Poojary,请查看this code

标签: c# multithreading producer-consumer tpl-dataflow blockingcollection


【解决方案1】:

使用BlockingCollection 代替队列、EventWaitHandle 和锁定对象:

public class DataInfo { }

private Thread mReceivingThread;
private Thread mSendingThread;

private BlockingCollection<DataInfo> queue;

private CancellationTokenSource receivingCts = new CancellationTokenSource();

private void ReceivingThread()
{
    try
    {
        while (!receivingCts.IsCancellationRequested)
        {
            // This will block until an item is added to the queue or the cancellation token is cancelled
            DataInfo item = queue.Take(receivingCts.Token);

            ProcessReceivingItem(item);
        }
    }
    catch (OperationCanceledException)
    {

    }
}

internal void EnqueueRecevingData(DataInfo info)
{
    // When a new item is produced, just add it to the queue
    queue.Add(info);
}

// To cancel the receiving thread, cancel the token
private void CancelReceivingThread()
{
    receivingCts.Cancel();
}

【讨论】:

  • 您有一个很好的BlockingCollection 示例,您能否进一步详细说明如何启动这个单独的线程?你会从另一个线程向这个集合添加一个项目吗?
【解决方案2】:

就个人而言,对于简单的生产者-消费者问题,我只会使用BlockingCollection。无需手动编写您自己的同步逻辑。如果队列中没有项目,消费线程也会阻塞。

如果您使用此类,您的代码可能如下所示:

private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>();
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>();

public void Stop()
{
    // No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls
    // below to complete.
    mReceivingThreadQueue.CompleteAdding();
    mSendingThreadQueue.CompleteAdding();
}

private void ReceivingThread()
{
    foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable())
    {
        ProcessReceivingItem(item);
    }
}

private void SendingThread()
{
    foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable())
    {
        ProcessSendingItem(item);
    }
}

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info)
{
    // You can also use TryAdd() if there is a possibility that you
    // can add items after you have stopped. Otherwise, this can throw an
    // an exception after CompleteAdding() has been called.
    mReceivingThreadQueue.Add(info);
}

public void EnqueueSend(DataSendEnqeueInfo info)
{
    mSendingThreadQueue.Add(info);
}

【讨论】:

  • 如果我没记错的话,BlockingCollection.GetConsumingEnumerable() 不一定会按照加入队列的顺序返回集合,这对我来说是个问题。另外我希望线程在集合为空时休眠,这就是为什么要使用 Waithandles,所以这也是在内部处理的吗?我也将对此进行基准测试,但在我这样做之前,您知道 BlockingCollections 的表现是好是坏?
  • BlockingCollection 将使用您提供的任何IProducerConsumerCollection&lt;T&gt;。默认情况下,它将使用ConcurrentQueue&lt;T&gt;,因此它将按照排队的顺序进行处理。与您编写解决方案的方式相同。性能方面?只有您可以对其进行概要分析。但我愿意打赌,与你(或我)手卷的任何东西相比,它至少会相同或更快。
  • 哇! BlockingCollection.GetConsumingEnumerable() 自动阻塞线程。我不知道为什么总是害怕在并发和/或线程方面使用较新的 API。 -_- 。感谢您的宝贵时间,我学到了一些新东西。
【解决方案3】:

按照 cmets 中的建议,您也可以尝试使用 TPL Dataflow 块。

据我所知,您有两个相似的管道,用于接收和发送,所以我假设您的类层次结构是这样的:

class EnqueueInfo { }
class DataRecievedEnqeueInfo : EnqueueInfo { }
class DataSendEnqeueInfo : EnqueueInfo { }

我们可以组装一个抽象类,它将封装创建管道的逻辑,并提供处理项目的接口,如下所示:

abstract class EnqueueInfoProcessor<T>
    where T : EnqueueInfo
{
    // here we will store all the messages received before the handling
    private readonly BufferBlock<T> _buffer;
    // simple action block for actual handling the items
    private ActionBlock<T> _action;

    // cancellation token to cancel the pipeline
    public EnqueueInfoProcessor(CancellationToken token)
    {
        _buffer = new BufferBlock<T>(new DataflowBlockOptions { CancellationToken = token });
        _action = new ActionBlock<T>(item => ProcessItem(item), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            CancellationToken = token
        });

        // we are linking two blocks so all the items from buffer
        // will flow down to action block in order they've been received
        _buffer.LinkTo(_action, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void PostItem(T item)
    {
        // synchronously wait for posting to complete
        _buffer.Post(item);
    }

    public async Task SendItemAsync(T item)
    {
        // asynchronously wait for message to be posted
        await _buffer.SendAsync(item);
    }

    // abstract method to implement
    protected abstract void ProcessItem(T item);
}

请注意,您也可以使用Encapsulate&lt;TInput, TOutput&gt; 方法封装两个块之间的链接,但在这种情况下,您必须正确处理缓冲区块的Completion,如果您正在使用它。

在此之后,我们只需要实现接收和发送句柄逻辑的两个方法:

public class SendEnqueueInfoProcessor : EnqueueInfoProcessor<DataSendEnqeueInfo>
{
    SendEnqueueInfoProcessor(CancellationToken token)
        : base(token)
    {

    }
    protected override void ProcessItem(DataSendEnqeueInfo item)
    {
        // send logic here
    }
}

public class RecievedEnqueueInfoProcessor : EnqueueInfoProcessor<DataRecievedEnqeueInfo>
{
    RecievedEnqueueInfoProcessor(CancellationToken token)
        : base(token)
    {

    }
    protected override void ProcessItem(DataRecievedEnqeueInfo item)
    {
        // recieve logic here
    }
}

如果您的消息流是关于ReceiveInfo 消息变成SendInfo,您也可以使用TransformBlock&lt;DataRecievedEnqeueInfo, DataSendEnqeueInfo&gt; 创建更复杂的管道。

【讨论】:

  • 哇!谢谢你这么详细的例子。几天来,我一直在通过 MSDN 阅读有关 TransformBlock 的信息,它看起来很有趣。我将实现这段代码,看看它是如何工作的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-01-22
  • 2013-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-24
相关资源
最近更新 更多