【问题标题】:Launching an unknown number of parallel tasks with return values使用返回值启动未知数量的并行任务
【发布时间】:2021-04-01 10:02:46
【问题描述】:

我有以下方法:

private async Task<string> MakeRequestAsync(string id)
{
    // ...
}

我有一个用户提供的 ID 列表。

我想为每个 ID 调用一次MakeRequestAsync()。但我想尽可能多地使用并行任务。我需要每次调用的结果,并且我想检测异常。

我查看了很多示例,但不知道如何执行此操作。例如,我找到了Parallel.ForEach(),但body 参数是Action&lt;&gt;,并且不返回任何值。另外,我不确定我应该创建的最大任务数是多少。

谁能告诉我你是怎么做到的?或者提供一篇好文章的链接?

【问题讨论】:

标签: c# async-await parallel-processing task task-parallel-library


【解决方案1】:

您可以使用await Task.WhenAll 轻松做到这一点,但我经常使用下面的便捷方法。你可以像这样使用它们:

(string, ArgumentException)[] results = await idList
    .ForEachParallelSafe<string, ArgumentException>(MakeRequestAsync);

结果的顺序与源集合的顺序相同。

如果只抛出第一个异常你没问题,你也可以这样做:

string[] results = await idList
    .ForEachParallel(MakeRequestAsync);

可选的degreeOfParallelism 参数允许您限制并行执行的最大数量。还有一个带有可选 elementSelector 参数的重载,以防您需要将每个原始输入元素与输出结合起来,例如,如果您需要创建一个 Dictionary

这里是代码(为了方便起见,我已经包含了所有的重载,你也可以选择你需要的):


        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions. If the execution of <paramref name="asyncFunc" /> throws an exception of type
        /// <typeparamref name="TException" />, it is caught and returned in the result.
        /// </summary>
        public static Task<(TOut Result, TException Exception)[]>
            ForeachParallelSafe<TIn, TOut, TException>(this IEnumerable<TIn> source,
                Func<TIn, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
            where TException : Exception
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));

            async Task<(TOut Result, TException Exception)> safeFunc(TIn input)
            {
                try
                {
                    return (await asyncFunc(input), null);
                }
                catch (TException e)
                {
                    return (default, e);
                }
            }

            return ForeachParallel(source, safeFunc, (orig, output) => output, degreeOfParallelism);
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions. The returned items are the result of applying <paramref name="elementSelector" />
        /// to each of the original items and the resulting items.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static async Task<TResult[]> ForeachParallel<T, TOut, TResult>(this IEnumerable<T> source,
            Func<T, Task<TOut>> asyncFunc, Func<T, TOut, TResult> elementSelector,
            int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
            if (elementSelector == null) throw new ArgumentNullException(nameof(elementSelector));

            // Copy the source into an array to avoid multiple enumerations.
            // Could be optimized to avoid copying in certain cases but this
            // is usually negligible compared to the async operation.
            T[] sourceCopy = source.ToArray();

            SemaphoreSlim semaphore = null;
            if (degreeOfParallelism > 0)
            {
                semaphore = new SemaphoreSlim(degreeOfParallelism, degreeOfParallelism);
            }

            
            TOut[] intermediateResults = await Task.WhenAll(sourceCopy
                .Select(async x =>
                {
                    if (semaphore != null)
                    {
                        await semaphore.WaitAsync();
                    }

                    try
                    {
                        return await asyncFunc(x);
                    }
                    finally
                    {
                        semaphore?.Release();
                    }
                }));


            TResult[] result = sourceCopy
                .Select((x, index) => elementSelector(x, intermediateResults[index]))
                .ToArray();

            semaphore?.Dispose();

            return result;
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the end of all executions.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static Task ForeachParallel<T>(this IEnumerable<T> source,
            Func<T, Task> asyncFunc, int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));

            Task<int> asyncTask(T t) => asyncFunc(t).ContinueWith(_ => 0);

            return ForeachParallel(source, asyncTask, (orig, output) => output, degreeOfParallelism);
        }

        /// <summary>
        /// Asynchronously executes the given <paramref name="asyncFunc" /> on each item in <paramref name="source" /> and
        /// awaits the result of executions.
        /// If a task throws an exception, only the first such exception is thrown by this method after all tasks have completed.
        /// Consider using <see cref="ForeachParallelSafe{TIn,TOut,TException}"/> if all exceptions are needed.
        /// </summary>
        public static Task<TOut[]> ForeachParallel<T, TOut>(this IEnumerable<T> source,
            Func<T, Task<TOut>> asyncFunc, int degreeOfParallelism = -1)
        {
            if (source == null) throw new ArgumentNullException(nameof(source));
            if (asyncFunc == null) throw new ArgumentNullException(nameof(asyncFunc));
            return ForeachParallel(source, asyncFunc, (orig, output) => output, degreeOfParallelism);
        }


【讨论】:

  • 我正在避免你的通用例程和 ReSharper 的东西,并试图了解基础知识。你能解释一下await semaphore.WaitAsync() 的作用吗?另外,Then(Task.WhenAll) 做了什么——而Then 对我来说是未定义的。
  • @JonathanWood 抱歉,我忽略了Then。现在是内联的。仅当您需要限制并行度时才使用信号量。 WaitAsync 基本上随时让degreeOfParallelism 很多线程进入临界区。也可以在这里查看:stackoverflow.com/questions/10806951/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-10-23
相关资源
最近更新 更多