【问题标题】:Simple Parallel Tasks with Continuation带有延续的简单并行任务
【发布时间】:2017-02-28 01:49:31
【问题描述】:

我已经阅读了 2 个小时,但我仍然感到困惑。有人说使用 StartNew,有人说 Task.Run,​​有人说别的。我知道 Task.Run 给了我一个编译错误。

我需要并行启动多个任务,然后在每个任务成功完成后执行后续任务。知道何时完成所有阻塞会很有帮助。

这是我所拥有的:

    public void DoSomeWork(object workItem)
    {
        var tasks = new Task<ResultArgs>[_itemList.Count];

        for (int loopCnt = 0; loopCnt < _itemList.Count; loopCnt++)
        {
            tasks[loopCnt] = new Task<ResultArgs>.Run(() =>
            {
                return _itemList[loopCnt].Analyze(workItem);
            });
            tasks[loopCnt].ContinueWith(ReportResults, TaskContinuationOptions.ExecuteSynchronously);
        }
    }

编译显示任务中不存在Run。

显然,我有一些东西在运行,但我不知道是什么。

我该如何解决这个问题?

【问题讨论】:

  • Task.Run 仅存在于 4.5 中,不存在于 4.0 中。

标签: c# c#-4.0 task-parallel-library


【解决方案1】:

您可以使用async 方法进行操作,也可以将您的项目流入数据流中,以下代码使用Tpl-dataflow 处理您的项目,将它们传递到您的第二个处理步骤,最后等待处理完成。

using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace AsyncProcessing {

    [TestFixture]
    public class PipelineTests {

        [Test]
        public async Task RunPipeline() {
            var pipeline = new MyPipeline();
            var data = Enumerable.Range(0, 1000).Select(x => new WorkItem(x, x));

            foreach(var item in data) {
                await pipeline.SendAsync(item);
            }

            pipeline.Complete();
            await pipeline.Completion;

            //all processing complete            
        }
    }

    class MyPipeline {

        private BufferBlock<WorkItem> inputBuffer;
        private TransformBlock<WorkItem, WorkItem> analyzeBlock;
        private TransformBlock<WorkItem, ResultArg> reportBlock;
        private ActionBlock<ResultArg> postOutput;

        public ConcurrentBag<ResultArg> OutputBuffer { get; }
        public Task Completion { get { return postOutput.Completion; } }

        public MyPipeline() {
            OutputBuffer = new ConcurrentBag<ResultArg>();
            CreatePipeline();
            LinkPipeline();
        }

        public void Complete() {
            inputBuffer.Complete();
        }

        public async Task SendAsync(WorkItem data) {
            await inputBuffer.SendAsync(data);
        }

        public void CreatePipeline() {
            var options = new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 10
            };

            inputBuffer = new BufferBlock<WorkItem>(options);

            analyzeBlock = new TransformBlock<WorkItem, WorkItem>(item => {
                //Anylyze item....
                return item;
            }, options);

            reportBlock = new TransformBlock<WorkItem, ResultArg>(item => {
                //report your results, email.. db... etc.
                return new ResultArg(item.JobId, item.WorkValue);
            }, options);

            postOutput = new ActionBlock<ResultArg>(item => {
                OutputBuffer.Add(item);
            }, options);
        }

        public void LinkPipeline() {
            var options = new DataflowLinkOptions() {
                PropagateCompletion = true,
            };

            inputBuffer.LinkTo(analyzeBlock, options);
            analyzeBlock.LinkTo(reportBlock, options);
            reportBlock.LinkTo(postOutput, options);
        }
    }

    public class WorkItem {

        public int JobId { get; set; }
        public int WorkValue { get; set; }

        public WorkItem(int id, int workValue) {
            this.JobId = id;
            this.WorkValue = workValue;
        }
    }

    public class ResultArg {

        public int JobId { get; set; }
        public int Result { get; set; }

        public ResultArg(int id, int result) {
            this.JobId = id;
            this.Result = result;
        }
    }
}

【讨论】:

  • 我最终使用了您对异步方法的建议。我认为我说得对,因为编译器没有大惊小怪。我们会在我开始测试时发现!我还将研究我以前从未见过的 TPL 数据流。
  • 查看数据流速成课程的编辑 :) 还有Intro to dataflow
  • 谢谢!更多阅读!斯蒂芬的东西通常很好,值得一读。感谢您的链接!
【解决方案2】:

为什么不使用 Parallel.ForEach 循环。这用于并行执行任务,它可以使用多个线程并且执行速度更快 Parallet.Foreach

但是,如果您正在执行一些涉及锁定的与数据库相关的输入输出操作,它可能会失败。在这种情况下,我建议在每个任务中保留一个返回类型,并根据前一个任务的返回类型启用 ext 任务。

【讨论】:

  • 我曾考虑过 Parallel.ForEach,但被警告说它不会将我的所有任务都放在准备运行队列中。它每次都会限制数量,并且由于近乎实时的限制,我担心吞吐量。但是,这确实提醒我我需要担心异常!
  • @AeroClassics async 如果使用默认任务调度程序,也有类似的限制。
  • 哦,提琴手。这不是我真正想听到的,而是我需要知道的。谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-27
  • 1970-01-01
  • 2015-09-14
  • 1970-01-01
  • 2014-06-17
  • 2021-12-21
  • 2014-03-28
相关资源
最近更新 更多