【问题标题】:How can I run a large number of task with a defined degree of parallelism in C# using async / await? [duplicate]如何使用 async / await 在 C# 中以定义的并行度运行大量任务? [复制]
【发布时间】:2019-12-22 11:00:15
【问题描述】:

我有一个活动列表,我想获取每个活动的表演。现在我想拥有最多 10 个并行运行的“代理”,而不是更多。

这就是代码的样子,除了我想确保这些代码不是all并行运行。 allEvents 中可能有 1000 个元素。我想将其限制为最多并行 10 个。

    public async Task RefreshAll(Event[] allEvents)
    {
        var allPerformances = (await Task.WhenAll(allEvents.Select(CollectPerformancesForEvent))).SelectMany(x => x);
        // persist allPerformances
    }
    private async Task<Performance[]> CollectPerformancesForEvent(Event @event)
    {
        // some API call collecting all performances for @event...
        await Task.Delay(500);
        return new[]{ new Performance() };
    }

【问题讨论】:

标签: c# .net parallel-processing async-await


【解决方案1】:

这是一个可以做到这一点的扩展方法

public static async Task<T[]> RunParallel<T>(this IEnumerable<Task<T>> tasks, int maxDegreeOfParallelism)
{
    var enumerationLock = new object();
    var parallelTasks = new List<Task>(maxDegreeOfParallelism);
    var results = new ConcurrentBag<T>();

    using (var enumerator = tasks.GetEnumerator())
    {
        // spin up just a few 'agents' to process the tasks
        await Task.WhenAll(Enumerable
            .Range(0, maxDegreeOfParallelism)
            .Select(_ => Task.Run(async () =>
            {
                Task<T> task;
                do
                {
                    // we need to make sure threads 'de-queue' without interferance
                    lock (enumerationLock)
                    {
                        // pick next element if available
                        if (!enumerator.MoveNext())
                        {
                            return;
                        }
                        task = enumerator.Current;
                    }

                    // wait for task to finish and add aggregate results
                    results.Add(await task);
                } while (task != null);
            }))
        );
        // wait until all 'agents' are finished
        await Task.WhenAll(parallelTasks);
    }
    return results.ToArray();
}

然后你可以通过简单地使用RunParallel来改变调用并定义并行度:

var allPerformances = (await allEvents.Select(CollectPerformancesForEvent).RunParallel(10)).SelectMany(x => x);

【讨论】:

  • 您的方法很好,但实现缺少一些功能。 On error 不会很快失败,并且结果与任务的顺序不同。 Here 是一个更完整的实现。
猜你喜欢
  • 1970-01-01
  • 2017-01-06
  • 1970-01-01
  • 1970-01-01
  • 2017-06-28
  • 2013-03-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多