【问题标题】:Parallel.ForEach with async lambda waiting forall iterations to complete带有异步 lambda 的 Parallel.ForEach 等待所有迭代完成
【发布时间】:2020-02-24 00:50:36
【问题描述】:

最近我看到几个与 Parallel.ForEach 相关的 SO 线程与异步 lambda 混合,但所有建议的答案都是某种解决方法。

有什么办法可以写:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});

如何确保列表包含每次迭代中使用 lambdas 执行的所有迭代中的所有项目?

Parallel.ForEach 通常如何与异步 lambda 一起工作,如果它命中 await 是否会将其线程移交给下一次迭代?

我假设 ParallelLoopResult IsCompleted 字段不是正确的,因为它会在所有迭代执行时返回 true,无论它们的实际 lambda 作业是否完成?

【问题讨论】:

  • await 将强制释放任务线程,并且它不保证在等待的任务完成时同一线程将继续。但是是的,Parallel ForEach 将保证所有迭代都完成并且 x 被添加到列表中,除非发生异常。
  • 您最好使用var results = await Task.WhenAll(arrayvalues.Select(x =&gt; LongRunningIoOperationAsync(x)))。 Parallel 更适合 CPU 密集型工作,而不是 IO 密集型。
  • 您的实现不是线程安全的。您不能在Parallel.ForEach 内调用list.Add(x)
  • @ldragicevic 如果你想限制并发异步操作的数量,请查看here

标签: c# task-parallel-library parallel.foreach


【解决方案1】:

最近我看到几个与 Parallel.ForEach 相关的 SO 线程与异步 lambda 混合,但所有建议的答案都是某种解决方法。

嗯,那是因为 Parallel 不适合 async。从不同的角度来看,你为什么要首先混合它们?他们做相反的事情Parallel 是关于添加线程,async 是关于放弃线程。如果你想同时做异步工作,那么使用Task.WhenAll。那是工作的正确工具; Parallel 不是。

也就是说,听起来你想使用错误的工具,所以这是你的做法......

如何确保列表包含每次迭代中使用 lambdas 执行的所有迭代中的所有项目?

在处理完成之前,您需要有某种信号可以阻止某些代码,例如 CountdownEventMonitor。附带说明一下,您还需要保护对非线程安全 List&lt;T&gt; 的访问。

Parallel.ForEach 通常如何与异步 lambda 一起工作,如果它命中 await 是否会将其线程移交给下一次迭代?

由于Parallel 不理解async lambda,当第一个await 产生(返回)给它的调用者时,Parallel 将假定循环的交互已完成。

我假设 ParallelLoopResult IsCompleted 字段不是正确的,因为它会在所有迭代执行时返回 true,无论它们的实际 lambda 作业是否完成?

正确。据Parallel 所知,它只能“看到”第一个await 返回其调用者的方法。所以它不知道async lambda 何时完成。它还会假设迭代过早完成,从而导致分区失效。

【讨论】:

  • 总是很好的阅读和雄辩的写作。请注意@ldragicevic,您接受的投票应放在此处
  • 是的,当然。 @Stephen 非常感谢您的澄清!记住这些事实会更有意义。
【解决方案2】:

你不需要Parallel.For/ForEach在这里你只需要等待一个任务列表。

背景

简而言之,您需要非常小心异步 lambdas,如果您将它们传递给 ActionFunc&lt;Task&gt;

您的问题是因为Parallel.For / ForEach 不适合异步和等待模式IO 绑定任务。它们适用于 cpu 绑定的工作负载。这意味着它们本质上具有Action 参数,让任务调度程序为您创建任务

如果您想同时运行多个 async 任务,请使用 Task.WhenAllTPL 数据流 Block(或类似的东西) 可以有效地处理 CPU boundIO bound 工作负载,或者更直接地说,它们可以处理 tasks 异步方法是。

除非你需要在你的 lambda 中做更多的事情(你还没有展示),否则只需使用 aSelectWhenAll

var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int

如果你这样做了,你仍然可以使用等待,

var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);

注意:如果你需要Parallel.ForEach的扩展功能(即控制最大并发的选项),有几种方法,但是RX或DataFlow 可能是最简洁的

【讨论】:

  • 谢谢,只有当 c# 程序可以处理任意长度的项目时,我才有点好奇,例如如果我通过 1000 个项目,那么安排一些有限数量的线程以少量线程的批次执行所有 1000 个任务会很聪明
  • @ldragicevic 默认任务调度程序将根据工作负载、内核和启发式方法限制并发
  • 感谢您的回复。我只担心是否需要处理并行性资源,但根据之前的评论,没关系。
猜你喜欢
  • 1970-01-01
  • 2020-02-22
  • 1970-01-01
  • 2018-07-17
  • 2016-07-07
  • 2016-03-16
  • 1970-01-01
  • 2021-04-22
  • 2011-10-04
相关资源
最近更新 更多