【问题标题】:Consume from Queue with multiple threads/tasks从具有多个线程/任务的队列中消费
【发布时间】:2013-06-20 18:46:24
【问题描述】:

我有一个生产者从资源中获取用户并将它们放入 ConcurrentQueue,然后我想要做的是使用多个消费者并处理所有用户并从另一个资源中获取他们的信息。

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

我的问题是,我应该使用线程吗?任务?线程池?

我看过这个问题:C# equivalent for Java ExecutorService.newSingleThreadExecutor(), or: how to serialize mulithreaded access to a resource

【问题讨论】:

  • 随心所欲;他们都可以完成这项工作。请注意,它们是建立在彼此之上的。 Task 使用线程池(一般),线程池使用线程实现。
  • @Servy:只是补充一点,虽然他们都可以完成这项工作,但在一般情况下,如果任务之间存在一些关系,如继续或分支,最好使用更高级别的 API,如 TPL此外,还与 C# 4.5 的新 await/async 功能无缝集成。
  • @Pragmateek 归根结底,这完全是个人喜好。如果有人发现将Thread 用于所有事情而不是任务更容易,他们可以这样做。大多数人似乎发现任务更容易处理,但这只是一种意见。这也取决于问题;有些问题更适合不同的抽象。另请注意,除了这三个之外,还有很多并行化框架/抽象,一些在 BCL 中和许多第 3 方扩展/替换。
  • 还有并行foreach,我的问题是这里的最佳实践是什么?
  • @DarthVader:就像 Servy 所说,这首先是个人喜好问题。从学术的角度来看,我认为最好先掌握线程之类的低级内容,然后再转向 TPL 等更高级别的抽象,因为它们可以让您以更高效的方式编写更具可读性的代码。

标签: c# multithreading queue


【解决方案1】:

由于您已经在使用排队机制,我建议您使用BlockingCollection 而不是ConcurrentQueue,以及Parallel.Invoke()

BlockingCollection 有一些重要的东西使它很好用。

  1. BlockingCollection 允许消费线程使用 foreach 以线程安全和自然的方式从集合中获取项目。
  2. 消费foreach 循环在队列为空时自动阻塞,并在项目可用时继续。
  3. BlockingCollection 提供了一种易于使用的机制来发出数据结束的信号。队列所有者只需调用 queue.CompleteAdding(),任何从队列中获取项目的 foreach 循环都会在队列完全为空时自动退出。

您可以使用Parallel.Invoke() 启动多个线程,每个线程都使用foreach 来遍历队列。 (Parallel.Invoke() 允许你给它一个并行运行的任务数组,这使得它使用起来非常简单。)

最好用一个示例程序来说明这一点:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

如果您不需要限制并发线程的数量并且乐于让 .Net 为您决定(这不是一个坏主意),那么您可以通过完全删除 processQueue() 并更改process()转:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

但是,这比它需要的锁定更多,所以你最好只使用原始方法(不会遇到这个问题),或者使用这里描述的解决方案:http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

【讨论】:

  • 顺便说一句,Parallel.ForEach(_queue.GetConsumingEnumerable() 使用锁的次数超过了它必须使用的次数。而分块Parallel.ForEach 确实不适用于GetConsumingEnumerable()either。如果其中任何一个对您有问题,您应该改用GetConsumingPartitioner() from ParallelExtensionsExtras
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-09-08
  • 2016-08-30
  • 2014-05-06
  • 2011-10-23
  • 1970-01-01
  • 1970-01-01
  • 2023-03-13
相关资源
最近更新 更多