【问题标题】:Parallel.ForEach fails to execute messages on long running IEnumerableParallel.ForEach 无法在长时间运行的 IEnumerable 上执行消息
【发布时间】:2011-12-11 02:48:39
【问题描述】:

为什么在 MoveNext 返回 false 之前,Parallel.ForEach 将无法完成一系列任务的执行?

我有一个工具可以监控 MSMQ 和 Service Broker 队列的组合以获取传入消息。当找到一条消息时,它将该消息传递给适当的执行者。

我将消息检查包装在一个 IEnumerable 中,这样我就可以将 IEnumerable 和一个委托交给 Parallel.ForEach 方法来运行。该应用程序设计为在循环中连续运行 IEnumerator.MoveNext 处理,直到它能够开始工作,然后 IEnumerator.Current 为其提供下一项。

由于在我将 CancelToken 设置为 true 之前 MoveNext 永远不会消失,因此这应该永远继续处理。相反,我看到的是,一旦 Parallel.ForEach 接收到所有消息并且 MoveNext 不再返回“true”,就不再处理任何任务。相反,似乎 MoveNext 线程是唯一一个在等待它返回时给予任何工作的线程,而其他线程(包括等待和调度线程)不做任何工作。

  • 有没有办法让 Parallel 在等待 MoveNext 的响应时继续工作?
  • 如果没有,是否有另一种方法来构建 MoveNext 以获得我想要的? (让它返回 true,然后 Current 返回一个空对象会产生很多虚假任务)
  • 额外问题:有没有办法限制 Parallel 一次发出多少条消息?它似乎一次完成并安排了很多消息(MaxDegreeOfParallelism 似乎只限制了它一次完成的工作量,并没有阻止它发出很多要安排的消息)

这是我所写内容的 IEnumerator(不含一些无关代码):

public class DataAccessEnumerator : IEnumerator<TransportMessage> 
{
    public TransportMessage Current
    {   get { return _currentMessage; } }

    public bool MoveNext()
    {
        while (_cancelToken.IsCancellationRequested == false)
        {
            TransportMessage current;
            foreach (var task in _tasks)
            {
                if (task.QueueType.ToUpper() == "MSMQ")
                    current = _msmq.Get(task.Name);
                else
                    current = _serviceBroker.Get(task.Name);

                if (current != null)
                {
                    _currentMessage = current;
                    return true;
                }
            }
            WaitHandle.WaitAny(new [] {_cancelToken.WaitHandle}, 500); 
        }

        return false; 
    }

    public DataAccessEnumerator(IDataAccess<TransportMessage> serviceBroker, IDataAccess<TransportMessage> msmq, IList<JobTask> tasks, CancellationToken cancelToken)
    {
        _serviceBroker = serviceBroker;
        _msmq = msmq;
        _tasks = tasks;
        _cancelToken = cancelToken;
    }

    private readonly IDataAccess<TransportMessage> _serviceBroker;
    private readonly IDataAccess<TransportMessage> _msmq;
    private readonly IList<JobTask> _tasks;
    private readonly CancellationToken _cancelToken;
    private TransportMessage _currentMessage;
}

这是 Parallel.ForEach 调用,其中 _queueAccess 是保存上述 IEnumerator 的 IEnumerable,RunJob 处理从该 IEnumerator 返回的 TransportMessage:

var parallelOptions = new ParallelOptions
    {
        CancellationToken = _cancelTokenSource.Token,
        MaxDegreeOfParallelism = 8 
    };

Parallel.ForEach(_queueAccess, parallelOptions, x => RunJob(x));

【问题讨论】:

标签: c# .net asynchronous task-parallel-library


【解决方案1】:

在我看来,Parallel.ForEach 并不适合您想做的事情。我建议你改用BlockingCollection&lt;T&gt; 创建一个生产者/消费者队列——创建一堆线程/任务来为阻塞集合提供服务,并在它们到达时向其中添加工作项。

【讨论】:

    【解决方案2】:

    我还没有努力确保这一点,但我从 Parallel.ForEach 的讨论中得到的印象是,它会将所有项目从可枚举项中拉出,它们会就如何划分做出适当的决定它们跨线程。根据您的问题,这似乎是正确的。

    因此,为了保留您当前的大部分代码,您可能应该将阻塞代码从迭代器中拉出,并将其放入围绕对 Parallel.ForEach(使用迭代器)的调用的循环中。

    【讨论】:

      【解决方案3】:

      您的问题可能与正在使用的分区器有关。

      在您的情况下,TPL 将选择块分区器,它将从枚举中获取多个项目,然后再将它们传递给进行处理。每个块中所取的项目数会随着时间的推移而增加。

      当您的 MoveNext 方法阻塞时,TPL 会等待下一个项目,并且不会处理它已经占用的项目。

      你有几个选项来解决这个问题:

      1) 编写一个始终返回单个项目的分区程序。不像听起来那么棘手。

      2) 使用 TPL 代替 Parallel.ForEach

      foreach ( var item in _queueAccess )
      {
          var capturedItem = item;
      
          Task.Factory.StartNew( () => RunJob( capturedItem ) );
      }
      

      第二种解决方案稍微改变了行为。 foreach 循环将在所有 Tasks 已创建后完成,而不是在它们完成后完成。如果这对您来说是个问题,您可以添加CountdownEvent

      var ce = new CountdownEvent( 1 );
      
      foreach ( var item in _queueAccess )
      {
          ce.AddCount();
      
          var capturedItem = item;
      
          Task.Factory.StartNew( () => { RunJob( capturedItem ); ce.Signal(); } );
      }
      
      ce.Signal();
      ce.Wait();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2010-12-31
        • 1970-01-01
        • 1970-01-01
        • 2011-06-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多