【问题标题】:Possible reasons why ParallelQuery.Aggregate does not run in parallelParallelQuery.Aggregate 不并行运行的可能原因
【发布时间】:2017-11-21 18:24:27
【问题描述】:

感谢 PLYNQ 专家提供的任何帮助!我会花时间查看答案,我在 math.SE 上的个人资料更完善。

我有一个 ParallelQuery<List<string>> 类型的对象,它有 44 个我想并行处理的列表(比如说,一次 5 个)。 我的流程有一个类似的签名

private ProcessResult Process(List<string> input)

处理将返回一个结果,这是一对布尔值,如下所示。

    private struct ProcessResult
    {
        public ProcessResult(bool initialised, bool successful)
        {
            ProcessInitialised = initialised;
            ProcessSuccessful = successful;
        }

        public bool ProcessInitialised { get; }
        public bool ProcessSuccessful { get; }
    }

问题。给定一个IEnumerable&lt;List&lt;string&gt;&gt; processMe,我的PLYNQ 查询尝试实现这个方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx。写成

processMe.AsParallel()
         .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
             (
                 new ConcurrentStack<ProcessResult>,   //aggregator seed
                 (agg, input) =>
                 {                         //updating the aggregate result
                     var res = Process(input);
                     agg.Push(res);
                     return agg;
                 },
                 agg => 
                 {                         //obtain the result from the aggregator agg
                     ProcessResult res;    // (in this case just the most recent result**)
                     agg.TryPop(out res);
                     return res;
                 }
             );

不幸的是,它不是并行运行,而是顺序运行。 (** 请注意,此实现没有“意义”,我只是想让并行化暂时起作用。)


我尝试了一个稍微不同的实现,它确实并行运行,但没有聚合。我定义了一个聚合方法(它本质上是ProcessResult 的两个部分上的布尔AND,即聚合([A1,A2],[B1,B2])≡[A1 && B1,A2 && B2])。

private static ProcessResult AggregateProcessResults
        (ProcessResult aggregate, ProcessResult latest)
    {
        bool ini = false, suc = false;
        if (aggregate.ProcessInitialised && latest.ProcessInitialised)
            ini = true;
        if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
            suc = true;


        return new ProcessResult(ini, suc);
    }

并使用了 PLYNQ 查询https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true),
    (res, input)  => Process(input),
    (agg, latest) => AggregateProcessResults(agg, latest),
    agg           => agg

这里的问题是 AggregateProcessResults 代码从未被命中,出于某种原因——我不知道结果在哪里......

感谢阅读,感谢您的帮助:)

【问题讨论】:

  • 如果你想为序列中的每一项计算一个新值,你应该使用Select,而不是Aggregate。当你对你正在尝试做的工作使用正确的操作时,你会发现系统将能够更有效地完成它。
  • 您的收藏中有多少件? (只有 44 个?)你有多少个 CPU 内核?因为在多个 Treads 和多个 CPU 内核上运行查询需要复杂的准备工作。必须将集合拆分为尽可能多的可用 CPU 内核的部分,在线程上运行任务,最后聚合结果。所以 .NET 足够聪明,不会做很多工作来让一切变得更慢......
  • @Major 我有 22000 个字符串,分批成 500 个,给出 44 个列表。我仅限于同时运行五个进程
  • @Servy 我想我明白了,但如果不明白,我很抱歉:我可以使用Select 来获得ParallelQuery&lt;ProcessResult&gt;。尽管之后我仍然需要汇总它,以获得单个ProcessResult。聚合不需要并行,但ParallelQuery.Aggregate 的重点肯定是避免将其分成两个步骤? :l
  • 尽管如此,如果您想进行一些昂贵的计算以将每个值转换为不同的值,然后聚合它们,您应该仍然使用Select 进行映射值,然后 Aggregate 聚合它们,而不是尝试在 Aggregate 中完成所有操作。但实际上,您正在映射值,然后甚至不聚合它们,因此根本没有理由使用 Aggregate

标签: c# .net multithreading parallel-processing plinq


【解决方案1】:

根据设计,您使用的Aggregate 的重载确实不会并行运行。您传递种子,然后传递阶跃函数,但阶跃函数 (agg) 的参数是从 上一步 步骤接收的累加器。出于这个原因,它本质上是顺序的(上一步的结果被输入到下一步)并且不可并行化。不确定为什么 ParallelEnumerable 会包含此重载,但可能是有原因的。

改为使用另一个重载:

var result = processMe
.AsParallel()
.Aggregate
(
    // seed factory. Each partition will call this to get its own seed
    () => new ConcurrentStack<ProcessResult>(),
    // process element and update accumulator
    (agg, input) =>
    {                                           
        var res = Process(input);
        agg.Push(res);
        return agg;
    },
    // combine accumulators from different partitions
    (agg1, agg2) => {
        agg1.PushRange(agg2.ToArray());
        return agg1;
    },
    // reduce
    agg =>
    {
        ProcessResult res;
        agg.TryPop(out res);
        return res;
    }
);

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-14
    • 2015-03-24
    • 1970-01-01
    • 2014-09-16
    相关资源
    最近更新 更多