【问题标题】:ArgumentOutOfRangeException on a ForEachAsync using Partitioner使用 Partitioner 的 ForEachAsync 上的 ArgumentOutOfRangeException
【发布时间】:2018-03-23 09:37:58
【问题描述】:

要设置上下文,应用程序在服务器上运行,其中 +-20 个其他应用程序正在执行一些多线程,但此过程仅适用于服务器上的 2 个应用程序。我从来没有在其他应用程序上遇到过这种错误,它都使用了 ForEachAsync 方法。在这个特定的应用程序上,我必须添加一些多线程,当我使用 ForEachAsync 时有时会出现此错误:

System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values.
Parameter name: partitionCount
   at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.GetOrderablePartitions(Int32 partitionCount)
   at System.Collections.Concurrent.OrderablePartitioner`1.GetPartitions(Int32 partitionCount)
   at Common.AsyncHelper.ForEachAsync[T](IEnumerable`1 source, Func`2 taskSelector, Int32 maxParallelism) in ...\AsyncHelper.cs:line 15

方法如下:

public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await taskSelector(partition.Current);
        }));
}

这是我的使用方法:

int parallel = list.Count() < 8 ? list.Count() : 8;

await list.ForEachAsync(async a => await Process(param1, param2),parallel);

我习惯了很多并行性吗?编辑:看起来空列表是问题所在。

这是一个最小的工作示例:

这是我的 AsyncHelper

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Common
{
    public static class AsyncHelper
    {
        public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
        {
            return Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
                select Task.Run(async delegate {
                    using (partition)
                        while (partition.MoveNext())
                            await taskSelector(partition.Current);
                }));
        }        
    }
}

我想要做的是处理一个列表并将结果设置在另一个列表中,最大值为 // = 8:

var temp = new ConcurrentBag<TempResponse>();
int parallel = 1;
if(someList.Any(c => c.Valid))
    parallel = someList.Count(c => c.Valid) < 8 ? someList.Count(c => c.Valid)  : 8;

await someList.ForEachAsync(async a => temp.Add(await Process(a.Condition, a.State, a.Name)),parallel);

【问题讨论】:

  • 尝试把“.ToArray()”放在.GetPartitions(maxParallelism)之后会发生什么?问题也在于方法 GetPartiions - 所以我们需要它的代码来调查
  • 你为什么要这样做而不是使用Parallel.ForEach?它已经根据数据大小和核心数对数据进行了分区并且允许您设置 DOP。如果您不想阻止等待它,请在 Task.Run() 中运行它。这不会浪费线程 - Parallel.For 使用 current 线程以及线程池中的线程
  • 在这种情况下我真的很想手动设置并行度,这就是我使用这种方法的原因
  • @Gun 然后在Parallel.ForEach中设置ParallelismOptions.MaxDegreeOfParallelism 参数
  • @Gun 你真的想做什么?听起来您想一次处理一批任务,而不是并行处理。这更适合 ActionBlock。您可以为 ActionBlock 设置 DOP,然后开始向其发送消息。它只会并行处理 DOP 指定的尽可能多的消息。您还可以为其输入缓冲区设置一个限制,以避免未处理的消息淹没它

标签: c# multithreading asynchronous foreach


【解决方案1】:

此错误仅表示您将 0 作为 maxParallelism 传递给您的函数。

int parallel = list.Count() < 8 ? list.Count() : 8;

如果目标list 为空,则可以为零,而对于Partitioner.Create(...).GetPartitions() 调用,零是无效值。

所以,只需检查列表是否为空,如果是 - 什么也不做(没有理由在上面调用 ForEachAsync)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多