【发布时间】:2018-03-23 09:37:58
【问题描述】:
要设置上下文,应用程序在服务器上运行,其中 +-20 个其他应用程序正在执行一些多线程,但此过程仅适用于服务器上的 2 个应用程序。我从来没有在其他应用程序上遇到过这种错误,它都使用了 ForEachAsync 方法。在这个特定的应用程序上,我必须添加一些多线程,当我使用 ForEachAsync 时有时会出现此错误:
System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values.
Parameter name: partitionCount
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.GetOrderablePartitions(Int32 partitionCount)
at System.Collections.Concurrent.OrderablePartitioner`1.GetPartitions(Int32 partitionCount)
at Common.AsyncHelper.ForEachAsync[T](IEnumerable`1 source, Func`2 taskSelector, Int32 maxParallelism) in ...\AsyncHelper.cs:line 15
方法如下:
public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await taskSelector(partition.Current);
}));
}
这是我的使用方法:
int parallel = list.Count() < 8 ? list.Count() : 8;
await list.ForEachAsync(async a => await Process(param1, param2),parallel);
我习惯了很多并行性吗?编辑:看起来空列表是问题所在。
这是一个最小的工作示例:
这是我的 AsyncHelper
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Common
{
public static class AsyncHelper
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await taskSelector(partition.Current);
}));
}
}
}
我想要做的是处理一个列表并将结果设置在另一个列表中,最大值为 // = 8:
var temp = new ConcurrentBag<TempResponse>();
int parallel = 1;
if(someList.Any(c => c.Valid))
parallel = someList.Count(c => c.Valid) < 8 ? someList.Count(c => c.Valid) : 8;
await someList.ForEachAsync(async a => temp.Add(await Process(a.Condition, a.State, a.Name)),parallel);
【问题讨论】:
-
尝试把“.ToArray()”放在.GetPartitions(maxParallelism)之后会发生什么?问题也在于方法 GetPartiions - 所以我们需要它的代码来调查
-
你为什么要这样做而不是使用
Parallel.ForEach?它已经根据数据大小和核心数对数据进行了分区并且允许您设置 DOP。如果您不想阻止等待它,请在Task.Run()中运行它。这不会浪费线程 - Parallel.For 使用 current 线程以及线程池中的线程 -
在这种情况下我真的很想手动设置并行度,这就是我使用这种方法的原因
-
@Gun 然后在
Parallel.ForEach中设置ParallelismOptions.MaxDegreeOfParallelism 参数 -
@Gun 你真的想做什么?听起来您想一次处理一批任务,而不是并行处理。这更适合 ActionBlock
。您可以为 ActionBlock 设置 DOP,然后开始向其发送消息。它只会并行处理 DOP 指定的尽可能多的消息。您还可以为其输入缓冲区设置一个限制,以避免未处理的消息淹没它
标签: c# multithreading asynchronous foreach