【问题标题】:Notify thread when data is added in queue当数据添加到队列中时通知线程
【发布时间】:2013-09-19 07:00:26
【问题描述】:

我有一个线程正在向队列中添加数据,现在我希望其他线程在添加数据时得到通知,以便它可以开始处理队列中的数据。

一个选项是线程将不断轮询队列以查看计数是否大于零,但我认为这不是好方法,任何其他建议将不胜感激

任何建议我如何实现这一点,我正在使用 .net framework 3.5。

如果我有两个线程,一个在做q.Enqueue(data),另一个在做q.dequeue(),在这种情况下我需要管理锁吗?

【问题讨论】:

  • code.msdn.microsoft.com/windowsdesktop/… 或使用ManualResetEventAutoResetEvent
  • 是的,您需要为队列加锁。但是第二个线程不知道何时将某些内容添加到队列中。您不想轮询(queue.peek),您可以使用事件来触发第二个线程,您仍然需要锁定队列。看我的例子,这是处理事件(无轮询),并锁定队列。

标签: c# multithreading thread-safety queue


【解决方案1】:

您可以使用ManualResetEvent 通知线程。

ManualResetEvent e = new ManualResetEvent(false);

在每个q.enqueue(); 之后执行e.Set() 并在处理线程中,您等待带有e.WaitOne() 的项目。

如果您在循环内进行处理,您应该在e.WaitOne() 之后立即执行e.Reset()

【讨论】:

  • 我建议改用 ManualResetEvent 来使用 AutoResetEvent。当 WaitOne() 和 Reset() 之间排队时,ManualResetEvent.Reset() 可能会出错
【解决方案2】:

我不使用队列,因为我宁愿对它们进行批处理。这在您必须打开/关闭(日志)文件、打开/关闭数据库时更有用。这是我如何创建这样的示例:

// J. van Langen
public abstract class QueueHandler<T> : IDisposable
{
    // some events to trigger.
    ManualResetEvent _terminating = new ManualResetEvent(false);
    ManualResetEvent _terminated = new ManualResetEvent(false);
    AutoResetEvent _needProcessing = new AutoResetEvent(false);

    // my 'queue'
    private List<T> _queue = new List<T>();

    public QueueHandler()
    {
        new Thread(new ThreadStart(() =>
        {
            // what handles it should wait on.
            WaitHandle[] handles = new WaitHandle[] { _terminating, _needProcessing };

            // while not terminating, loop (0 timeout)
            while (!_terminating.WaitOne(0))
            {
                // wait on the _terminating and the _needprocessing handle.
                WaitHandle.WaitAny(handles);

                // my temporay array to store the current items.
                T[] itemsCopy;

                // lock the queue
                lock (_queue)
                {
                    // create a 'copy'
                    itemsCopy = _queue.ToArray();

                    // clear the queue.
                    _queue.Clear();
                }

                if (itemsCopy.Length > 0)
                    HandleItems(itemsCopy);
            }

            // the thread is done.
            _terminated.Set();

        })).Start();
    }

    public abstract void HandleItems(T[] items);

    public void Enqueue(T item)
    {
        // lock the queue to add the item.
        lock (_queue)
            _queue.Add(item);

        _needProcessing.Set();
    }

    // batch
    public void Enqueue(IEnumerable<T> items)
    {
        // lock the queue to add multiple items.
        lock (_queue)
            _queue.AddRange(items);

        _needProcessing.Set();
    }

    public void Dispose()
    {
        // let the thread know it should stop.
        _terminating.Set();

        // wait until the thread is stopped.
        _terminated.WaitOne();
    }

}

对于_terminating/_terminated,我使用ManualResetEvent,因为它们只是设置的。

对于_needProcessing,我使用AutoResetEvent,它不能通过ManualResetEvent来完成,因为当它被触发时,另一个线程可以再次Set它,所以如果你在WaitHandle.WaitAny之后Reset它您可以撤消新添加的项目。 (嗯,如果有人可以更简单地解释一下,欢迎。:)

例子:

public class QueueItem
{
}

public class MyQueue : QueueHandler<QueueItem>
{
    public override void HandleItems(QueueItem[] items)
    {
        // do your thing.
    }
}


public void Test()
{
    MyQueue queue = new MyQueue();

    QueueItem item = new QueueItem();
    queue.Enqueue(item);

    QueueItem[] batch = new QueueItem[]
    {
        new QueueItem(),
        new QueueItem()
    };

    queue.Enqueue(batch);

    // even on dispose, all queued items will be processed in order to stop the QueueHandler.
    queue.Dispose();
}

【讨论】:

  • 感谢您目前提供的如此详尽的解释,ManualResetEvent 对我来说可以,就像您的代码一样。
【解决方案3】:

使用BlockingCollection 类。这样做的好处是,如果队列为空,Take 方法会阻塞(不进行轮询)。它包含在 .NET 4.0+ 中或作为Reactive Extension 下载的一部分,甚至可能是TPL backport via NuGet。如果您愿意,可以使用以下未优化的类变体。

public class BlockingCollection<T>
{
    private readonly Queue<T> m_Queue = new Queue<T>();

    public void Add(T item)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(item);
            Monitor.Pulse(m_Queue);
        }
    }

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count == 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public bool TryTake(out T item)
    {
        item = default(T);
        lock (m_Queue)
        {
            if (m_Queue.Count > 0)
            {
                item = m_Queue.Dequeue();
            }
        }
        return item != null;
    }

}

【讨论】:

    【解决方案4】:

    我认为 BlockingCollection 会比 Queue 做得更好。除此之外,持续检查队列大小(并在线程为零时暂停线程)是非常好的方法。

    顺便说一句,我们在这里谈论的是生产者-消费者模式。我想你可以用谷歌搜索其他方法。

    【讨论】:

    • 哎呀,很确定 BC 是老 8(
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多