【发布时间】:2017-11-21 18:24:27
【问题描述】:
感谢 PLYNQ 专家提供的任何帮助!我会花时间查看答案,我在 math.SE 上的个人资料更完善。
我有一个 ParallelQuery<List<string>> 类型的对象,它有 44 个我想并行处理的列表(比如说,一次 5 个)。
我的流程有一个类似的签名
private ProcessResult Process(List<string> input)
处理将返回一个结果,这是一对布尔值,如下所示。
private struct ProcessResult
{
public ProcessResult(bool initialised, bool successful)
{
ProcessInitialised = initialised;
ProcessSuccessful = successful;
}
public bool ProcessInitialised { get; }
public bool ProcessSuccessful { get; }
}
问题。给定一个IEnumerable<List<string>> processMe,我的PLYNQ 查询尝试实现这个方法:https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx。写成
processMe.AsParallel()
.Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
(
new ConcurrentStack<ProcessResult>, //aggregator seed
(agg, input) =>
{ //updating the aggregate result
var res = Process(input);
agg.Push(res);
return agg;
},
agg =>
{ //obtain the result from the aggregator agg
ProcessResult res; // (in this case just the most recent result**)
agg.TryPop(out res);
return res;
}
);
不幸的是,它不是并行运行,而是顺序运行。 (** 请注意,此实现没有“意义”,我只是想让并行化暂时起作用。)
我尝试了一个稍微不同的实现,它确实并行运行,但没有聚合。我定义了一个聚合方法(它本质上是ProcessResult 的两个部分上的布尔AND,即聚合([A1,A2],[B1,B2])≡[A1 && B1,A2 && B2])。
private static ProcessResult AggregateProcessResults
(ProcessResult aggregate, ProcessResult latest)
{
bool ini = false, suc = false;
if (aggregate.ProcessInitialised && latest.ProcessInitialised)
ini = true;
if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
suc = true;
return new ProcessResult(ini, suc);
}
并使用了 PLYNQ 查询https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx
.Aggregate<List<string>, ProcessResult, ProcessResult>(
new ProcessResult(true, true),
(res, input) => Process(input),
(agg, latest) => AggregateProcessResults(agg, latest),
agg => agg
这里的问题是 AggregateProcessResults 代码从未被命中,出于某种原因——我不知道结果在哪里......
感谢阅读,感谢您的帮助:)
【问题讨论】:
-
如果你想为序列中的每一项计算一个新值,你应该使用
Select,而不是Aggregate。当你对你正在尝试做的工作使用正确的操作时,你会发现系统将能够更有效地完成它。 -
您的收藏中有多少件? (只有 44 个?)你有多少个 CPU 内核?因为在多个 Treads 和多个 CPU 内核上运行查询需要复杂的准备工作。必须将集合拆分为尽可能多的可用 CPU 内核的部分,在线程上运行任务,最后聚合结果。所以 .NET 足够聪明,不会做很多工作来让一切变得更慢......
-
@Major 我有 22000 个字符串,分批成 500 个,给出 44 个列表
。我仅限于同时运行五个进程 -
@Servy 我想我明白了,但如果不明白,我很抱歉:我可以使用
Select来获得ParallelQuery<ProcessResult>。尽管之后我仍然需要汇总它,以获得单个ProcessResult。聚合不需要并行,但ParallelQuery.Aggregate的重点肯定是避免将其分成两个步骤? :l -
尽管如此,如果您想进行一些昂贵的计算以将每个值转换为不同的值,然后聚合它们,您应该仍然使用
Select进行映射值,然后Aggregate聚合它们,而不是尝试在Aggregate中完成所有操作。但实际上,您正在映射值,然后甚至不聚合它们,因此根本没有理由使用Aggregate。
标签: c# .net multithreading parallel-processing plinq