【问题标题】:TPL Queue ProcessingTPL 队列处理
【发布时间】:2013-02-18 04:42:05
【问题描述】:

我目前正在做一个项目,我需要排队处理一些工作,这是要求:

  1. 作业必须一次处理一个
  2. 必须能够等待排队的项目

所以我想要类似的东西:

Task<result> QueueJob(params here)
{
   /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result.
}

我尝试过一个后台运行任务,它只是将项目从队列中拉出并处理作业,但困难在于从后台任务到方法。

如果需要,我可以只在 QueueJob 方法中请求完成回调,但如果我能得到一个透明的任务返回,让您等待作业被处理(即使队列中还有工作)。

【问题讨论】:

    标签: c# queue task-parallel-library


    【解决方案1】:

    您可能会发现TaskCompletionSource&lt;T&gt; 很有用,它可以用来创建一个Task,它可以在您想要的时候准确地完成。如果将其与BlockingCollection&lt;T&gt; 结合使用,您将获得队列:

    class JobProcessor<TInput, TOutput> : IDisposable
    {
        private readonly Func<TInput, TOutput> m_transform;
    
        // or a custom type instead of Tuple
        private readonly
            BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>
            m_queue =
            new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>();
    
        public JobProcessor(Func<TInput, TOutput> transform)
        {
            m_transform = transform;
            Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
        }
    
        private void ProcessQueue()
        {
            Tuple<TInput, TaskCompletionSource<TOutput>> tuple;
            while (m_queue.TryTake(out tuple, Timeout.Infinite))
            {
                var input = tuple.Item1;
                var tcs = tuple.Item2;
    
                try
                {
                    tcs.SetResult(m_transform(input));
                }
                catch (Exception ex)
                {
                    tcs.SetException(ex);
                }
            }
        }
    
        public Task<TOutput> QueueJob(TInput input)
        {
            var tcs = new TaskCompletionSource<TOutput>();
            m_queue.Add(Tuple.Create(input, tcs));
            return tcs.Task;
        }
    
        public void Dispose()
        {
            m_queue.CompleteAdding();
        }
    }
    

    【讨论】:

    • 为我以前没见过的东西点赞!这可能很有用,现在我有一个可行的解决方案。
    【解决方案2】:

    我会选择这样的:

    class TaskProcessor<TResult>
    {
        // TODO: Error handling!
    
        readonly BlockingCollection<Task<TResult>> blockingCollection = new BlockingCollection<Task<TResult>>(new ConcurrentQueue<Task<TResult>>());
    
        public Task<TResult> AddTask(Func<TResult> work)
        {
            var task = new Task<TResult>(work);
            blockingCollection.Add(task);
            return task; // give the task back to the caller so they can wait on it
        }
    
        public void CompleteAddingTasks()
        {
            blockingCollection.CompleteAdding();
        }
    
        public TaskProcessor()
        {
            ProcessQueue();
        }
    
        void ProcessQueue()
        {
            Task<TResult> task;
            while (blockingCollection.TryTake(out task))
            {
                task.Start();
                task.Wait(); // ensure this task finishes before we start a new one...
            }
        }
    }
    

    根据使用它的应用程序的类型,您可以将 BlockingCollection/ConcurrentQueue 切换为更简单的东西(例如,只是一个普通队列)。您还可以根据要排队的方法/参数来调整“AddTask”方法的签名...

    【讨论】:

    • 更有效的方法是使用task.RunSynchronously()
    • 另外,您的代码将无法工作,因为如果集合为空,TryTake() 的重载会立即返回 false。您需要使用允许您指定超时并将其设置为无穷大的重载。
    • 还有一件事:你直接从构造函数开始ProcessQueue()。如果ProcessQueue() 工作正常,构造函数将永远不会返回。
    • @svick 所有的优点,我主要是从记忆中快速删除了这一点,我想我应该在帖子中提到这一点。 (而且我最近两天一直生病,这让我远离了……)
    【解决方案3】:

    Func&lt;T&gt; 不带任何参数,返回类型为 T 的值。作业一个一个运行,您可以等待返回的任务获得结果。

    public class TaskQueue
    {
        private Queue<Task> InnerTaskQueue;
    
        private bool IsJobRunning;
    
        public void Start()
        {
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    if (InnerTaskQueue.Count > 0 && !IsJobRunning)
                    {
                         var task = InnerTaskQueue.Dequeue()
                         task.Start();
                         IsJobRunning = true;
                         task.ContinueWith(t => IsJobRunning = false);
                    }
                    else
                    {
                         Thread.Sleep(1000);
                    }
                }
            }
        }
    
        public Task<T> QueueJob(Func<T> job)
        {
            var task = new Task<T>(() => job());
            InnerTaskQueue.Enqueue(task);
            return task;
        }
    }
    

    【讨论】:

    • 虽然这并没有完全解决我的特定问题,但它是我所做工作的一个很好的框架,并以一种很好的通用方式解决了问题,所以我将它标记为答案。谢谢!
    • 就睡眠而言,这是线程池上的单个线程,所以我们不会暂停整个线程,线程池足够智能,可以进行优化和适当的调度。在这一点上,这听起来像是挑剔...... Stack Overflow 是关于在人们需要帮助的地方提供指导,在这种情况下,听起来我能够为他指出正确的方向。当然,如果性能很紧张,他会在需要的地方进行优化,但这些都不会改变如何对代表进行排队等实际解决方案,这就是这个问题的意义所在。
    • “带有 while 循环的单个线程一次一个地从队列中取出一个项目是线程安全的。”但是一个线程出队项目和另一个线程入队它们不是。 (顺便说一句,因为你没有@-notify我,我不知道你回复了我,所以我才回复这么晚。)
    • “所以我们不会暂停整个线程” 除了你。如果您阻止线程等待,则该线程不能用于其他任何事情。它是ThreadPool 线程这一事实并没有改变任何事情。而且我确实认为 SO 答案应该促进良好的实践,而不仅仅是有效的东西。 (尽管您的代码甚至没有这样做。)
    • 当人们一起合作、互相帮助而不是试图踩对方的喉咙来表明谁“更正确”时,一个社区就会蓬勃发展。您有一些很好的建议,所以感谢您的输入和愉快的编码:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-07
    • 2012-04-10
    • 2013-05-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多