【问题标题】:EventHub ForEach Parallel AsyncEventHub ForEach 并行异步
【发布时间】:2018-11-07 18:41:12
【问题描述】:

总是设法让自己与异步工作混淆,我在这里进行了一些验证/确认,我正在做我认为我在以下情况下正在做的事情..

举个简单的例子:

// pretend / assume these are json msgs or something ;)
var strEvents = new List<string> { "event1", "event2", "event3" };

我可以将每个事件发布到 eventthub,如下所示:

foreach (var e in strEvents)
{
    // Do some things
    outEventHub.Add(e); // ICollector
}

foreach 将在单个线程上运行,并按顺序执行其中的每一件事。我猜到 eventthub 的发布也将保持在同一个线程上??

将 ICollector 改为 IAsyncCollector,实现以下效果:

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

我想我是说 foreach 将在单个线程上运行,实际发送到事件中心将被推迟到其他地方?或者至少不阻塞同一个线程..

更改为 Parallel.ForEach 事件,因为这些事件一次将到达 100 多个左右:

 Parallel.ForEach(events, async (e) =>
 {
      // Do some things
      await outEventHub.AddAsync(e);
 });

现在开始有点朦胧,因为我不确定 真正 现在正在发生什么...... afaik 每个事件都有它自己的线程(在硬件范围内)和步骤在那个线程中不要阻塞它。所以这个简单的例子放在一边。

最后,我可以将它们全部转为我认为的任务..

 private static async Task DoThingAsync(string e, IAsyncCollector<string> outEventHub)
 {
      await outEventHub.AddAsync(e);
 }

 var t = new List<Task>();

 foreach (var e in strEvents)
 {
      t.Add(DoThingAsync(e, outEventHub));
 }

 await Task.WhenAll(t);

现在我真的很迷茫,我认为这是在单个线程上准备所有内容..然后在任何可用线程上同时运行所有内容??

我很感激为了确定哪个适合手头的工作,需要进行基准测试...但是现在解释框架在每种情况下所做的工作对我来说非常有帮助..

【问题讨论】:

  • 有点离题但是,假设我们在这里谈论的是 azure 事件中心,我建议你捆绑事件和 send events in a batch
  • 这不是太离题@PeterBons,但是是的,这是一个好主意,而且我们真的做的还不够多。我一定会考虑的 :)

标签: c# multithreading async-await


【解决方案1】:

并行!=异步

这是这里的主要思想。两者各有各的用途,可以一起使用,但是差别很大。您的假设基本正确,但让我澄清一下:

简单的foreach

这是非并行非异步。没什么好说的。

在 foreach 中等待

这是异步代码,非并行

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

这一切都发生在一个线程上。它需要一个事件,开始将它添加到您的事件中心,并且在它完成时(我猜它会做某种网络IO)它将线程交还给线程池(如果它在 UI 线程上调用,则返回 UI),因此它可以在等待 AddAsync 返回的同时做其他工作。但正如你所说,is 根本不是平行的。

并行 Foreach(异步)

这是一个陷阱!简而言之,Parallel.Foreach 专为同步工作负载而设计。我们会回到这一点,但首先让我们假设您将它与非异步代码一起使用。

并行 foreach(同步)

又名。 并行但不异步。

Parallel.ForEach(events, (e) =>
 {
      // Do some things
      outEventHub.Add(e);
 });

每个项目都有自己的“任务”,但它们不会产生线程。创建线程的成本很高,在最佳情况下,线程数超过 CPU 内核数是没有意义的。取而代之的是,这些任务在 ThreadPool 上运行,它的线程数与最佳线程数一样多。每个线程接受一个任务,处理它,然后再接受另一个,等等。

您可以将其想象为 - 在 4 核机器上 - 有 4 个工作人员围绕着一堆任务,因此一次运行其中 4 个。您可以想象这在 IO 绑定工作负载的情况下并不理想(这很可能是)。如果您的网络很慢,您可以阻止所有 4 个线程尝试发送事件,而它们可能正在做有用的工作。这导致我们...

任务

异步和潜在的并行(取决于使用情况)。

您的描述在这里也是正确的,除了 ThreadPool,它会立即(在主线程上)启动所有任务,然后在池的线程上运行。当它们运行时,主线程被释放,然后可以根据需要执行其他工作。到目前为止,它与Parallel.Foreach 的情况相同。但是:

发生的情况是,TaskPool 线程拿起一个任务,进行必要的预处理,然后异步发出网络请求。这意味着该任务在等待网络时不会阻塞,而是释放 ThreadPool 线程以获取另一个工作项。当网络请求完成时,任务continuation(网络请求之后的剩余代码行)被安排回任务列表。

您可以看到理论上这是最有效的过程,速度如此之快,以至于您必须小心不要淹没您的网络。

回到 Parallel.Foreach 和异步

此时您应该能够发现问题所在。你所有的异步 lambda async (e) =&gt; { await outEventHub.AddAsync(e);} 正在做的就是开始工作,它会在它到达 await 后立即返回。 (请记住,async/await 在等待时释放线程。)Parallel.Foreach 在启动所有线程后立即返回。 但是没有什么在等待这些任务!这些变成一劳永逸,这通常是一种不好的做法。就像您从任务示例中删除了 await Task.WhenAll 调用。

我希望这为您解决了大部分问题,如果没有,请告诉我要改进的地方。

【讨论】:

  • 这绝对是一个极好的解释.. 里面有几件事我不得不停下来思考......但它真的很清楚,而且比试图拼凑要好得多来自无数在线文档的信息位.. re:我的'await inside foreach'示例确认不会同时执行多个 AddAsync() 作业...它仍然会按顺序执行吗?只有当您有其他代码/活动可以在同一个 foreach 迭代中处理事情时,这种模式才会变得有用..? ?
  • 感谢您的友好回复。:) 关于 foreach 内部的 await:(常规 foreach,对吗?)不,它不会。它将一一运行。然而,这种模式总是有用的,对于 IO 绑定的任务,如果可以的话,您应该始终使用 Async 方法。这样 CPU 线程在等待网络响应时被释放。它可以执行“全局任务列表”中的任何其他任务,它不必与您的 foreach 相关。最简单的例子是在 GUI 应用程序中它可以更新 GUI(因此它不会冻结),在 Web 应用程序中它可以处理另一个请求,等等
【解决方案2】:

为什么不并行异步发送这些事件,像这样:

var tasks = new List<Task>();

foreach( var e in strEvents )
{
   tasks.Add(outEventHub.AddAsync(e));
}

await Task.WhenAll(tasks);
await outEventHub.FlushAsync();

【讨论】:

  • 我想我最终确实最终这样做了..我明天上班时会标记答案:)
猜你喜欢
  • 1970-01-01
  • 2013-02-14
  • 1970-01-01
  • 1970-01-01
  • 2016-01-02
  • 1970-01-01
  • 1970-01-01
  • 2022-01-12
相关资源
最近更新 更多