【问题标题】:Howto: Parallel.Foreach executes many processes, after each process run a new process (but one at a time)? [closed]如何:Parallel.Foreach 执行许多进程,每个进程运行一个新进程后(但一次一个)? [关闭]
【发布时间】:2012-09-07 19:01:17
【问题描述】:

我相信有人知道这一点,我将非常感谢您的回答。我对委托和异步等知之甚少 - 所以请给我一个如何实现的一般示例。

我有一个工作流程,其中 我可以使用 Parallel.Foreach 同时为许多不同的文件执行一个方法(甜,研磨那个处理器) - 但是在该方法结束后,我需要运行另一种方法(它会生成关于前一个进程的报告), 并且第二种方法不能并行运行。

我不想在生成报告之前等待 Parallel.ForEach 中的所有文件完成(这不是必需的)。但是,如果我在第一种方法结束时开始报告生成方法,那么我就会遇到问题。有什么排队之类的吗?一定有一些漂亮的方法,对吧?

谢谢

【问题讨论】:

  • 那么您希望什么时候开始生成报告?您是否希望将 foreach 的结果排队,直到达到特定阈值然后开始报告?
  • 在对文件执行第一种方法之后,我想立即开始执行第二种方法 - 但是第二种方法一次只能处理一个“文件”。所以许多进程正在运行,其中一个完成,开始报告,另一个许多进程完成,等待第一个报告然后开始第二个报告生成。
  • 是的,我实际上正在执行一个启动进程(另一个应用程序)的方法/函数,但这是一大堆令人困惑的语义。
  • 有没有办法编辑标题/问题?

标签: c# task-parallel-library .net-4.5 parallel.foreach


【解决方案1】:

我认为 Jim G 的意思是:

var lockObj = new object();

Parallel.Foreach(files, file => 
{
    // Processing file
    lock(lockObj)
    {
        // Generate report.
    }
});

【讨论】:

  • 很好,这很完美。有趣,我不确定我是否完全直觉(直觉是一个词还是可以这样使用,我觉得很聪明......或愚蠢)无论如何,谢谢,我会尝试理解这一点以及 Arbiter.Interleave这似乎是一个非常强大的工具。
【解决方案2】:

第二种方法应该链接为continuation task

在第二种方法中,使用lock 或互斥锁以确保它不会并行运行。

【讨论】:

    【解决方案3】:

    并发和协调运行时 (CCR) 中的 Arbiter.Interleave 协调原语提供了一种实现所需功能的简单方法。基本上你传递了 3 个接收器组,1 个用于并发任务,1 个用于独占任务(不并行执行),1 个用于关闭整个进程。你可以找到一个如何使用它的例子here

    【讨论】:

      【解决方案4】:

      另一种选择是使用生产者-消费者模型。您有一个thread safe blocking collection,您将完成的数据放入其中,然后您有一个线程来运行从该集合中提取数据的报告。

      //Collection to hold the data the processed files generated
      var proccesedDataItems = new new BlockingCollection<ResultData>();
      
      //A thread that processes the files
      var processReports = new Task(() =>
      {
          //Removes items from the collection, if the collection is empty it blocks
          // or if "CompletedAdded" has been called it will reach the "end" of the 
          // collection
          foreach(var processedData in proccesedDataItems.GetConsumingEnumerable())
          {
              BuildReport(processedData);
          }
      });
      processReports.Start();    
      
      //Generating the data
      Parallel.Foreach(files, file => 
      {
         var proccesedData = ProcessFile(file)
         proccesedDataItems.Add(processedData);
      });
      
      //Let anyone consuming the collection that you can stop waiting for new items.
      proccesedDataItems.CompleteAdding();
      

      【讨论】:

        【解决方案5】:

        这样的事情非常适合 TPL 数据流模型:您创建一个处理文件的并行块,然后创建另一个生成报告的非并行块:

        var processFileBlock = new TransformBlock<File, Result>(
            file => ProcessFile(file),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });
        
        var generateReportBlock = new ActionBlock<Result>(
            result => GenerateReport(result));
        
        processFileBlock.LinkTo(generateReportBlock);
        
        foreach (var file in files)
            processFileBlock.Post(file);
        

        如果您还想等到所有处理完成,则需要使用Complete()Completetion 添加一些代码。

        【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-06-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-12-19
        相关资源
        最近更新 更多