【问题标题】:thread safe processing of a queue队列的线程安全处理
【发布时间】:2018-09-19 12:17:36
【问题描述】:

我将来自 UI 线程的任务放入队列中,以便它们可以在另一个线程中处理。如果无事可做,线程应该等待 AutoResetEvent - 显然所有这些都应该是线程安全的。

我将任务从 UI 线程放入队列中,如下所示:

lock (_syncObject)
{
    _queue.Enqueue(new FakeTask());
}

    _autoResetEvent.Set();

到目前为止,我的处理线程循环如下所示:

while (true)
{
    FakeTask task = null;
    lock (_syncObject)
    {
        if (_queue.Count > 0)
        {
            task = _queue.Dequeue();
        }
    }

    if (task != null)
    {
        task.Run();
        Thread.Sleep(1000);  //just here for testing purposes
    }

    if (_queue.Count == 0)
    {
        _autoResetEvent.WaitOne();
    }
}

我担心我检查队列中是否有其他东西的最后一部分不是线程安全的,我想知道我是如何做到的。谢谢!

【问题讨论】:

  • 您可以使用ConcurrentQueue 而不是自己处理锁
  • 一开始我也想过ConcurrentQueue,但是这个不就是锁了EnqueueDequeue的方法吗?我仍然会错过在最后两行之间添加一些东西的竞态条件?
  • 如果您使用TryPeek 而不是计数。您也可以查看Producer Consumer pattern using the Dataflow library,看看是否更适合您的需求
  • 看看专门为此目的设计的BlockingCollection

标签: c# .net multithreading thread-safety


【解决方案1】:

简单情况中,尝试使用为实现Producer / Consumer pattern专门设计BlockingCollection

private async Task Process() {
  using (BlockingCollection<FakeTask> _queue = new BlockingCollection<FakeTask>()) {
    Task producer = Task.Run(() => {
      while (!completed) {
        //TODO: put relevant code here 
        _queue.Add(new FakeTask());
      }

      _queue.CompleteAdding();
    });

    Task consumer = Task.Run(() => {
      foreach (FakeTask task in _queue.GetConsumingEnumerable()) {
        //TODO: process task - put relevant code here
      }
    });

    await Task.WhenAll(producer, consumer).ConfigureAwait(false); 
  }
}

编辑:如果producerUI线程本身:

private async Task Process() {
  using (BlockingCollection<FakeTask> _queue = new BlockingCollection<FakeTask>()) {
    Task consumer = Task.Run(() => {
      foreach (FakeTask task in _queue.GetConsumingEnumerable()) {
        //TODO: process task - put relevant code here
      }
    });

    // If we produce in UI we don't want any separate Task 
    while (!completed) {
      //TODO: put relevant code here 
      _queue.Add(new FakeTask());
    }

    _queue.CompleteAdding();

    await consumer.ConfigureAwait(false); 
  }
}

纠缠网格的情况下(例如,生产者#1、#2 为消费者#1、#2、#3 生成任务,而消费者#1、#2、#3 又为……创建任务)尝试DataFlow

【讨论】:

  • 谢谢,我的案例唯一的特点是任务必须按照添加的顺序执行,而不是并行执行。这个解决方案能保证吗?
  • @oleole:任务将被消费按顺序(先到先消费),这是BlockingCollection的默认行为
  • 好的,谢谢,我只是想知道这一点,因为文档说有一个构造函数可以包装 ConcurrentQueue。如果无论如何都能保证订单,为什么需要这样做?
  • @oleole:为了灵活性:您可以提供自己的支持集合(例如stack优先级查询等。 ) 这在很多情况下很有用
【解决方案2】:

创建一个线程只是为了让它基本上可以把所有的时间都花在无所事事地等待工作完成是没有用的。

您需要做的就是围绕 UI 任务对要完成的后台工作的调度使用异步锁定机制。 SemaphoreSlim 提供了这样的异步同步机制。

await sempahoreSlim.WaitAsync();
try
{
    await Task.Run(() => DoWork());
}
finally
{
    sempahoreSlim.Release();
}

不仅代码少了很多,而且代码更简单,更准确地反映了应用程序的业务逻辑,而且您消耗的系统资源也少了很多。

当然,如果不同的后台操作可以​​安全地并行运行,那么只需使用线程池,而不是你自己的消息循环,代码就变得更简单了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-23
    • 1970-01-01
    • 1970-01-01
    • 2012-12-26
    相关资源
    最近更新 更多