【问题标题】:"Infinite" asynchronous parallel foreach loop“无限”异步并行 foreach 循环
【发布时间】:2020-08-20 07:00:24
【问题描述】:

我有一个包含 50K 到 100K 字的 List<string>

我想以并行异步方式迭代它

例如,我可以使用

while (true)
{
   Parallel.ForEach(words, new ParallelOptions { MaxDegreeOfParallelism = 100 }, ...)
}

但问题是:

  1. Parallel.ForEach 不是异步
  2. 当我们到达列表的末尾时,我们必须等待每个线程结束,然后while (true) 语句才能继续
  3. 这意味着没有始终运行 100 个线程,这正是我想要的

我怎样才能做到这一点?

如果这令人困惑,或者我不善于解释,请告诉我。

【问题讨论】:

  • 您几乎肯定不希望始终运行 100 个线程...
  • 1) 为什么要异步?考虑到它在无限循环中,这将是危险的。另外,您不应该async/awaitParallel.xxx 一起使用 2) 所以 3) 也许,也许不是。 Parallel.xxx 的动态线程池大小调整非常聪明。您可能需要考虑 TPL DataFlow,因为它支持管道;限制和节流,并且与异步配合得很好。 TPLDF 同时满足 CPU 密集型和 I/O 密集型作业
  • 如果您实际上想要同时运行 100 个线程(同样,您真的不希望 - 您一定会感到困惑),那么您将不得不手动使用 Thread 对象。 .Net 中的异步实现将自行处理线程以达到最佳数量,除非您有 128 个核心服务器,否则它不会一次分配 100 个运行。任何时候你使用 await 一个线程池线程都会获得延续,并且很可能不会有 100 个线程池线程
  • "我正在使用 HttpClient" - 那么你肯定不想启动 “100 个线程”。启动一个只是等待 I/O 的线程是对线程的浪费。可悲的是,启动一个只处理一个网络链接的线程更糟糕。 Parallel.xxxasync/await 很危险,您希望后者支持 IOCP。最好使用 TPL DataFlow 和 async/await。对于这种事情来说,这真是太棒了。 Parallel.ForEach 最适合用于受 CPU 限制的操作,而您的操作似乎不是
  • 这听起来像是一个 X/Y 问题 - 为什么你认为你需要同时 100 个?

标签: c# asynchronous parallel-processing async-await


【解决方案1】:

这是一个完全做作的 async 友好的 TPL DataFlow 示例,说明如何实现您的要求。

  1. 适用于异步 IO 工作负载
  2. 可以取消
  3. 它限制了最大并行度
  4. 它的容量有限,因此始终有 100 个工作可用
  5. 它是无限的

给定

private static CancellationTokenSource _cs;
private static CancellationToken _token;
private static ActionBlock<string> _block;

private static async Task MethodAsync(string something)
{
   // Your async workload
}

public static async Task EndlessRunner(string[] someArray)
{
   try
   {
      var index = 0;
      while (!_token.IsCancellationRequested)
      {
         await _block.SendAsync(someArray[index],_token);
         if (++index >= someArray.Length) index = 0;
      }
   }
   catch (OperationCanceledException)
   {
      Console.WriteLine("Cancelled");
   }
}

示例

private static async Task Main()
{
   _cs = new CancellationTokenSource();
   _token = _cs.Token;

   _block = new ActionBlock<string>(
      MethodAsync, 
      new ExecutionDataflowBlockOptions()
      {
         EnsureOrdered = false,
         MaxDegreeOfParallelism = 100,
         BoundedCapacity = 100,
         CancellationToken = _cs.Token,
         SingleProducerConstrained = true
      });

   var someList = Enumerable
      .Range(0,5000)
      .Select(I => $"something {I}")
      .ToArray();

   Task.Run(() => EndlessRunner(someList));

   Console.ReadKey();

   _cs.Cancel();

   _block.Complete();
   await _block.Completion;

}

【讨论】:

  • 这正是我一直在寻找的东西,而我自己可能永远无法做到这一点。在使用之前,我会确保完全理解您的代码及其工作原理,如果我有任何问题,我会回复您。非常感谢!
  • @Matt 不要忘记安装 nuget
  • 所以我有一个问题。做这个工作一样吗?我相信它会,但只是确保。 i.imgur.com/d6Ia1Ep.jpg
  • @Matt 取消时要小心,它的效果需要保持一致。如果代码的一个分支可以抛出OperationCanceledException,那么另一个分支不应该只检查IsCancellationRequested,然后像什么都没发生一样退出。考虑改为调用ThrowIfCancellationRequested() 方法。
  • @TheodorZoulias 你完全正确。感谢您的提示!
猜你喜欢
  • 1970-01-01
  • 2020-05-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-12
  • 2017-10-31
相关资源
最近更新 更多