【问题标题】:Is it OK to do some async/await inside some .NET Parallel.ForEach() code?在一些 .NET Parallel.ForEach() 代码中做一些异步/等待可以吗?
【发布时间】:2014-07-06 22:46:21
【问题描述】:

鉴于以下代码,在Parallel.ForEach 中执行async/await 是否OK

例如。

Parallel.ForEach(names, name =>
{
    // Do some stuff...

    var foo = await GetStuffFrom3rdPartyAsync(name);

    // Do some more stuff, with the foo.
});

还是有一些我需要注意的问题?

编辑:不知道这是否可以编译,顺便说一句。只是伪代码.. 大声思考。

【问题讨论】:

    标签: c# .net parallel-processing async-await parallel.foreach


    【解决方案1】:

    不,将asyncParalell.Foreach 组合起来没有意义。

    考虑以下示例:

    private void DoSomething()
    {
        var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
        Parallel.ForEach(names, async(name) =>
        {   
            await Task.Delay(1000);
            Console.WriteLine("Name {0} completed",name);
        });
        Console.WriteLine("Parallel ForEach completed");
    }
    

    你会期待什么输出?

    Name Somename3 completed
    Name Somename8 completed
    Name Somename4 completed
    ...
    Parallel ForEach completed
    

    这不会发生。它将输出:

    Parallel ForEach completed
    Name Somename3 completed
    Name Somename8 completed
    Name Somename4 completed
    ...
    

    为什么?因为当ForEach 第一个命中await 时,该方法实际返回,Parallel.ForEach 不知道它是异步的,它运行完成!。 await 之后的代码在另一个线程上作为延续运行不是“并行处理线程”

    Stephen toub addressed this here

    【讨论】:

    • 好的 - 那么你有什么建议呢?
    • @Pure.Krome 您试图解决的问题是什么?在给定的背景下,很难回答。如果可以选择等待,您可以Task.Run(async ()=> await Something()).Wait();
    【解决方案2】:

    根据名称,我假设 GetStuffFrom3rdPartyAsync 是 I/O 绑定的。 Parallel 类专门用于 CPU 绑定代码。

    在异步世界中,您可以使用Task.WhenAll 启动多个任务,然后(异步)等待它们全部完成。由于您从一个序列开始,将每个元素投影到一个异步操作可能是最简单的,然后等待所有这些操作:

    await Task.WhenAll(names.Select(async name =>
    {
      // Do some stuff...
      var foo = await GetStuffFrom3rdPartyAsync(name);
      // Do some more stuff, with the foo.
    }));
    

    【讨论】:

      【解决方案3】:

      一个接近的选择可能是这样的:

      static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)
      {
          var tasks = data.Select(item => 
              Task.Run(() => func(item)));
      
          Task.WaitAll(tasks.ToArray());
      }
      
      // ... 
      
      ForEach(names, name => GetStuffFrom3rdPartyAsync(name));
      

      理想情况下,您不应该使用像Task.WaitAll 这样的阻塞调用,如果您可以在当前调用堆栈上“一直向下”调用async 的整个方法链:

      var tasks = data.Select(item => 
          Task.Run(() => func(item)));
      
      await Task.WhenAll(tasks.ToArray());
      

      此外,如果您在 GetStuffFrom3rdPartyAsync 内不做任何 CPU 密集型工作,Task.Run 可能是多余的:

      var tasks = data.Select(item => func(item));
      

      【讨论】:

        【解决方案4】:

        正如@Sriram Sakthivel 所指出的,将Parallel.ForEach 与异步lambda 一起使用存在一些问题。 Steven Toub 的ForEachASync 可以做到这一点。他谈论它here,但这里是代码:

        public static class Extensions
        {
            public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
            {
                return Task.WhenAll(
                    from partition in Partitioner.Create(source).GetPartitions(dop)
                    select Task.Run(async delegate {
                                                       using (partition) while (partition.MoveNext()) await body(partition.Current);
                    }));
            }
        }
        

        它使用Partitioner 类创建负载平衡分区器(doco),并允许您使用dop 参数指定要运行的线程数。看看它和Parallel.ForEach 的区别。试试下面的代码。

         class Program
            {
                public static async Task GetStuffParallelForEach()
                {
                    var data = Enumerable.Range(1, 10);
                    Parallel.ForEach(data, async i =>
                    {
                        await Task.Delay(1000 * i);
                        Console.WriteLine(i);
                    });
                }
        
                public static async Task GetStuffForEachAsync()
                {
                    var data = Enumerable.Range(1, 10);
                    await data.ForEachAsync(5, async i =>
                    {
                        await Task.Delay(1000 * i);
                        Console.WriteLine(i);
                    });
        
                }
        
                static void Main(string[] args)
                {
                    //GetStuffParallelForEach().Wait(); // Finished printed before work is complete
                    GetStuffForEachAsync().Wait(); // Finished printed after all work is done
                    Console.WriteLine("Finished");
                    Console.ReadLine();
                }
        

        如果您运行GetStuffForEachAsync,程序会等待所有工作完成。如果你运行GetStuffParallelForEach,那么Finished这一行将在工作完成之前被打印出来。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2013-01-08
          • 2020-12-02
          • 2022-12-10
          • 2018-10-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2023-02-03
          相关资源
          最近更新 更多