【问题标题】:Queue and Dequeue in multiple threads多线程中的队列和出队
【发布时间】:2013-06-18 05:58:18
【问题描述】:

我正在创建一个索引器,它使需要处理的项目入队。索引器会将项目添加到其处理器。例如,它将添加 100 个项目,然后在 3 分钟内不添加项目并添加另外 50 个项目。

public class Processer
{
    private ConcurrentQueue<Item> items;

    public void AddItem(Item item)
    {
        this.items.Enqueue(item);
    }
}

这些项目会随机进入,所以我将创建一个单独的线程来出列和处理这些项目。

最好的选择是什么?

  1. 不要使用集合,而是使用线程池:

    public void AddItem(Item item)
    {
        ThreadPool.QueueUserWorkItem(function, item);
    }
    

    这将自动创建一个队列,并处理项目,但我控制较少,当找到 20 个项目时,它们几乎会停止我的索引器运行并首先完成这个线程池

  2. 使用长时间运行的任务:

    public Processer()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }
    
    public DequeueItems()
    {
        while(true)
        {
            Item item = null;
            while(this.items.TryDequeue(out item)
            {
                this.store.ExecuteIndex((AbstractIndexCreationTask)item);
            }
    
            Thread.Sleep(100); 
        }
    }
    

    但我讨厌必须使用的 while() 和 thread.sleep,因为可枚举的对象会在一段时间后干涸,并且需要重新检查是否有新项目。

  3. 使用短期运行任务:

    public Processer()
    {
    
    }
    private void Run()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.PreferFairness,
            TaskScheduler.Default);
    }
    public void AddItem(Item item)
    {
        this.items.Add(item);
        if(this.task == null || this.task.isCompleted)
            this.Run();
    }
    public DequeueItems()
    {
        Item item = null;
        while(this.items.TryDequeue(out item)
        {
            this.store.ExecuteIndex((AbstractIndexCreationTask)item);
        }
    }
    

    这可能会更好?但是启动一个线程是一个“昂贵”的操作,我不知道我是否会错过项目,因为我检查了 IsCompleted,这可能是在结束 while 循环的过程中,这样会丢失 1 个项目。但它不会休眠,并使用脏的 while 循环。

  4. 您的选择,由于 MSDN 建议使用 TPL,我认为不使用线程,但也许有更好的方法来处理这个问题

更新日志

  1. 更改为 BlockingCollection
  2. 改回并发队列

我检查过的一些事情:

【问题讨论】:

  • 是的,我知道我可以使用其他线程安全队列,但队列不是问题,我希望就处理此问题的最佳实践获得一些反馈。
  • @user2331234:不,队列非常的问题。它不是线程安全的,而 BlockingCollection&lt;T&gt; 专为像您这样的生产者/消费者情况而设计。
  • 我的问题是如何添加一个单独的线程来处理项目。我将使用 BlockingCollection 调整上面的示例。但我仍然希望得到一些反馈,哪些选项用于使队列出队。
  • 我发表评论后才看到您的编辑。我的理解是GetConsumingEnumerable会无限期地等待直到CompleteAdding被调用,所以我还是不明白Thread.Sleepwhile循环。

标签: c# multithreading task-parallel-library task threadpool


【解决方案1】:

我认为这里最简单的解决方案是使用BlockingCollection(可能使用它的GetConsumingEnumerable())和一个长期运行的Task。无事可做的时候,这会浪费一个Thread,但是一个浪费的Thread也不错。

如果您不能浪费Thread,那么您可以选择您的#3 之类的东西。但是你必须非常小心地使它成为线程安全的。例如,在您的代码中,如果 Task 没有运行并且同时从两个线程调用 AddItem(),您最终会创建两个 Tasks,这几乎肯定是错误的。

如果您使用的是 .Net 4.5,另一种选择是使用来自 TPL Dataflow 的ActionBlock。有了它,您不会浪费任何线程,也不必自己编写困难的线程安全代码。

【讨论】:

  • 谢谢,我会加一个锁来确保它只运行一次。很遗憾我不能使用 .NET 4.5,因为我正在为 windows server 2003+ 创建软件...
【解决方案2】:

我认为信号量可能适合您。你会找到一个很好的解释here

另外我建议使用ConcurrentQueue

【讨论】:

  • stackoverflow.com/questions/15456499/… 就像这个一样,(在我的“已检查列表”中),我想知道使用 TPL 是否更好,但感谢您的示例。这可能确实是限制最大线程数的好方法,但在我的情况下它会打开很多待处理的线程,我不知道这是否会导致异常。有可能队列中有1000个(或更多)的item,会直接得到一个自己的线程,等待一个sem被释放。
  • 你能解释一下你将如何在这里使用Semaphore吗?
猜你喜欢
  • 2012-11-05
  • 1970-01-01
  • 2010-10-28
  • 2011-09-27
  • 2013-04-18
  • 1970-01-01
  • 2013-10-10
  • 1970-01-01
相关资源
最近更新 更多