【发布时间】:2011-12-11 02:48:39
【问题描述】:
为什么在 MoveNext 返回 false 之前,Parallel.ForEach 将无法完成一系列任务的执行?
我有一个工具可以监控 MSMQ 和 Service Broker 队列的组合以获取传入消息。当找到一条消息时,它将该消息传递给适当的执行者。
我将消息检查包装在一个 IEnumerable 中,这样我就可以将 IEnumerable 和一个委托交给 Parallel.ForEach 方法来运行。该应用程序设计为在循环中连续运行 IEnumerator.MoveNext 处理,直到它能够开始工作,然后 IEnumerator.Current 为其提供下一项。
由于在我将 CancelToken 设置为 true 之前 MoveNext 永远不会消失,因此这应该永远继续处理。相反,我看到的是,一旦 Parallel.ForEach 接收到所有消息并且 MoveNext 不再返回“true”,就不再处理任何任务。相反,似乎 MoveNext 线程是唯一一个在等待它返回时给予任何工作的线程,而其他线程(包括等待和调度线程)不做任何工作。
- 有没有办法让 Parallel 在等待 MoveNext 的响应时继续工作?
- 如果没有,是否有另一种方法来构建 MoveNext 以获得我想要的? (让它返回 true,然后 Current 返回一个空对象会产生很多虚假任务)
- 额外问题:有没有办法限制 Parallel 一次发出多少条消息?它似乎一次完成并安排了很多消息(MaxDegreeOfParallelism 似乎只限制了它一次完成的工作量,并没有阻止它发出很多要安排的消息)
这是我所写内容的 IEnumerator(不含一些无关代码):
public class DataAccessEnumerator : IEnumerator<TransportMessage>
{
public TransportMessage Current
{ get { return _currentMessage; } }
public bool MoveNext()
{
while (_cancelToken.IsCancellationRequested == false)
{
TransportMessage current;
foreach (var task in _tasks)
{
if (task.QueueType.ToUpper() == "MSMQ")
current = _msmq.Get(task.Name);
else
current = _serviceBroker.Get(task.Name);
if (current != null)
{
_currentMessage = current;
return true;
}
}
WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500);
}
return false;
}
public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
{
_serviceBroker = serviceBroker;
_msmq = msmq;
_tasks = tasks;
_cancelToken = cancelToken;
}
private readonly IDataAccess<TransportMessage> _serviceBroker;
private readonly IDataAccess<TransportMessage> _msmq;
private readonly IList<JobTask> _tasks;
private readonly CancellationToken _cancelToken;
private TransportMessage _currentMessage;
}
这是 Parallel.ForEach 调用,其中 _queueAccess 是保存上述 IEnumerator 的 IEnumerable,RunJob 处理从该 IEnumerator 返回的 TransportMessage:
var parallelOptions = new ParallelOptions
{
CancellationToken = _cancelTokenSource.Token,
MaxDegreeOfParallelism = 8
};
Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));
【问题讨论】:
-
也许响应式扩展可能会有所帮助? msdn.microsoft.com/en-us/data/gg577609
标签: c# .net asynchronous task-parallel-library