【问题标题】:ForEachAsync with ResultForEachAsync 与结果
【发布时间】:2022-03-17 20:46:23
【问题描述】:

我正在尝试将 Stephen Toub's ForEachAsync<T> 扩展方法更改为返回结果的扩展...

斯蒂芬的扩展:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

我的方法(不起作用;任务被执行但结果错误)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

我知道我必须以某种方式返回 (WhenAll?) 最后一部分的结果,但我还不知道该怎么做...

更新:即使所有任务都已执行,我得到的结果也只是 degreeOfParallelism 次 null(我猜是因为 default(TResult))。我也试过return await body(...),然后结果很好,但只有degreeOfParallelism的任务数被执行了。

【问题讨论】:

  • "Result is wrong" 根本无法描述您所看到的内容。您返回default(TResult) 的事实似乎不是一个好的开始。如果您提供一个简短但完整的程序来演示问题,这将有所帮助,包括示例输入、预期输出和实际输出。 (我强烈怀疑你在这里想要SelectMany 而不是Select,基本上......)
  • @JonSkeet:添加了更新
  • 您不能列一个列表,将您的结果添加到该列表中,然后在完成所有操作后在最后返回吗?
  • @poke: 也想过,但我相信这不是真正的异步?!?
  • 您当前发布的代码无法编译,这无济于事...

标签: c# asynchronous concurrency task-parallel-library parallel.foreachasync


【解决方案1】:

您的 LINQ 查询的结果数只能与分区数相同 - 您只是将每个分区投影到一个结果中。

如果你不关心顺序,你只需要将每个分区的结果组装成一个列表,然后将它们展平即可。

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(我已将其从 ForEachAsync 重命名,因为 ForEach 听起来势在必行(适合原版中的 Func&lt;T, Task&gt;),而这是获取结果。foreach 循环没有结果 - 这个确实。)

【讨论】:

  • 哦对了,将列表保留在同步部分,然后返回列表。那挺好的!毕竟我猜我离得太远了。
  • @JonSkeet:无法编译(list.Add 而不是 results.Add),但它仍然无法正常工作
  • @Dunken:我已经修复了list.Add,但请提供更多信息,而不是“它仍然无法正常工作”——这对我来说并没有太多帮助。 (同样,如果您提供了一个简短但完整的程序来演示该问题,我自己测试它会更容易......)
  • @DunkenL:好的,如果你在我立即修复它之前这么说 - 该方法只需要是一个异步方法。 总是说出错误是什么,而不仅仅是“它不起作用”。
  • @Dunken:Doh,错字。应该是异步的而不是等待。现在试试……我自己也试试……
【解决方案2】:

现在Parallel.ForEachAsync API 已成为标准库 (.NET 6) 的一部分,因此基于此 API 实现返回 Task&lt;TResult[]&gt; 的变体是有意义的。这是一个实现:

public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    var results = new List<TResult>();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    var withIndexes = source.Select((item, index) => (item, index));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        var (item, index) = entry;
        var result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        if (t.IsCanceled)
        {
            // Propagate the correct token
            CancellationToken ct = default;
            try { t.GetAwaiter().GetResult(); }
            catch (OperationCanceledException oce) { ct = oce.CancellationToken; }
            return Task.FromCanceled<TResult[]>(ct);
        }
        if (t.IsFaulted)
        {
            var tcs = new TaskCompletionSource<TResult[]>();
            tcs.SetException(t.Exception.InnerExceptions);
            return tcs.Task;
        }
        lock (results) return Task.FromResult(results.ToArray());
    }, default, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default)
        .Unwrap();
}

此实现支持 Parallel.ForEachAsync 重载的所有选项和功能,该重载具有 IEnumerable&lt;T&gt;source。它在错误和取消的情况下的行为是相同的。结果的排列顺序与source 序列中的关联元素相同。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-08-27
    • 1970-01-01
    • 1970-01-01
    • 2015-06-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多