【问题标题】:Workaround for the WaitHandle.WaitAll 64 handle limit?WaitHandle.WaitAll 64 句柄限制的解决方法?
【发布时间】:2010-04-23 23:31:53
【问题描述】:

我的应用程序通过ThreadPool.QueueUserWorkItem 产生大量不同的小型工作线程,我通过多个ManualResetEvent 实例跟踪这些线程。我使用WaitHandle.WaitAll 方法阻止我的应用程序在这些线程完成之前关闭。

我以前从未遇到过任何问题,但是,由于我的应用程序正在承受更多负载,即创建更多线程,我现在开始遇到此异常:

WaitHandles must be less than or equal to 64 - missing documentation

什么是最好的替代解决方案?

代码片段

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

解决方法

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();

【问题讨论】:

  • 如果您迁移到 .NET 4,请查看 CountdownEvent。它将计数包装在一个整洁的包装中。

标签: c# multithreading waithandle


【解决方案1】:

创建一个跟踪正在运行的任务数量的变量:

int numberOfTasks = 100;

创建信号:

ManualResetEvent signal = new ManualResetEvent(false);

每当任务完成时减少任务数量:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

如果没有剩余任务,设置信号:

    signal.Set();
}

同时,在其他地方等待信号被设置:

signal.WaitOne();

【讨论】:

  • @dtb:因此,与其维护 MRE 列表,不如维护计数,并在每个线程完成后递减(如果计数达到 0 则触发)?
  • 没错。 MRE 有点重量级,所以 64 位限制是有原因的。尽量避免它们太多:-)
  • 太棒了,还有一件事。我不完全知道动态建立列表的线程数在执行任何工作之前增加每个新线程内的计数然后在最后减少是否安全?
  • 不,不要增加线程中的 numberOfTasks。这可能会导致 numberOfTasks 达到 0 但尚未启动所有任务的情况。在调用 ThreadPool.QueueUserWorkItem 之前增加 numberOfTasks,因此可以保证在调用 signal.WaitOne() 之前所有任务都已启动。使用 Interlocked.Increment 增加 numberOfTasks。
  • 任务中的任何代码 (// do work) 都可能引发异常。因此,您应该确保,如果抛出异常,numberOfTasks 会正确减少 (try ... finally)。不要担心您的 UserWorkItem 可能不会被执行——它会的。最终。
【解决方案2】:

从 .NET 4.0 开始,您可以使用另外两个(以及 IMO,更简洁的)选项。

首先是使用CountdownEvent class。它避免了必须自己处理递增和递减的需要:

int tasks = <however many tasks you're performing>;

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

但是,还有一个更强大的解决方案,那就是使用Task class,如下所示:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

使用Task 类和对WaitAll 的调用更简洁,IMO,因为您在整个代码中编织的线程原语更少(注意,没有等待句柄);您不必设置计数器,处理递增/递减,您只需设置您的任务,然后等待它们。这让代码在您想要做什么的what 中更具表现力,而不是how 的原语(至少在管理它的并行化方面)。

.NET 4.5 提供了更多选项,您可以通过调用static Run method on the Task class 来简化Task 实例序列的生成:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

或者,您可以利用 TPL DataFlow library(它位于 System 命名空间中,因此它是官方的,即使它是从 NuGet 下载的,如 Entity Framework)并使用 ActionBlock&lt;TInput&gt;,如下所示:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

注意ActionBlock&lt;TInput&gt;默认一次处理一个项目,所以如果你想让它一次处理多个动作,你必须在构造函数中设置你想要处理的并发项目的数量通过传递一个ExecutionDataflowBlockOptions 实例并设置MaxDegreeOfParallelism property:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

如果您的操作是真正线程安全的,那么您可以将MaxDegreeOfParallelsim 属性设置为DataFlowBlockOptions.Unbounded

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

重点是,您可以细粒度地控制如何您希望您的选项是平行的。

当然,如果您想要将一系列项目传递到您的 ActionBlock&lt;TInput&gt; 实例中,那么您可以链接一个 ISourceBlock&lt;TOutput&gt; 实现来提供 ActionBlock&lt;TInput&gt;,如下所示:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

根据您需要做什么,TPL Dataflow 库成为一个更具吸引力的选择,因为它处理所有链接在一起的任务的并发性,并且它可以让您非常具体地只是了解您希望每个部分的并行程度,同时保持每个块的关注点的适当分离。

【讨论】:

    【解决方案3】:

    您的解决方法不正确。原因是SetWaitOne 可能会竞争,如果最后一个工作项导致threadCount 变为零排队线程不得不排队 /em> 工作项。修复很简单。将排队线程视为工作项本身。将threadCount初始化为1,并在排队完成时进行减量并发出信号。

    int threadCount = 1;
    ManualResetEvent finished = new ManualResetEvent(false);
    ...
    Interlocked.Increment(ref threadCount); 
    ThreadPool.QueueUserWorkItem(delegate 
    { 
        try 
        { 
             // do work 
        } 
        finally 
        { 
            if (Interlocked.Decrement(ref threadCount) == 0) 
            { 
                 finished.Set(); 
            } 
        } 
    }); 
    ... 
    if (Interlocked.Decrement(ref threadCount) == 0)
    {
      finished.Set();
    }
    finished.WaitOne(); 
    

    作为个人喜好,我喜欢使用 CountdownEvent 类来为我计算。

    var finished = new CountdownEvent(1);
    ...
    finished.AddCount();
    ThreadPool.QueueUserWorkItem(delegate 
    { 
        try 
        { 
             // do work 
        } 
        finally 
        { 
          finished.Signal();
        } 
    }); 
    ... 
    finished.Signal();
    finished.Wait(); 
    

    【讨论】:

      【解决方案4】:

      添加到 dtb 的答案中,您可以将其包装成一个不错的简单类。

      public class Countdown : IDisposable
      {
          private readonly ManualResetEvent done;
          private readonly int total;
          private long current;
      
          public Countdown(int total)
          {
              this.total = total;
              current = total;
              done = new ManualResetEvent(false);
          }
      
          public void Signal()
          {
              if (Interlocked.Decrement(ref current) == 0)
              {
                  done.Set();
              }
          }
      
          public void Wait()
          {
              done.WaitOne();
          }
      
          public void Dispose()
          {
              ((IDisposable)done).Dispose();
          }
      }
      

      【讨论】:

        【解决方案5】:

        当我们想要回调时添加到 dtb 的答案中。

        using System;
        using System.Runtime.Remoting.Messaging;
        using System.Threading;
        
        class Program
        {
            static void Main(string[] args)
            {
                Main m = new Main();
                m.TestMRE();
                Console.ReadKey();
        
            }
        }
        
        class Main
        {
            CalHandler handler = new CalHandler();
            int numberofTasks =0;
            public void TestMRE()
            {
        
                for (int j = 0; j <= 3; j++)
                {
                    Console.WriteLine("Outer Loop is :" + j.ToString());
                    ManualResetEvent signal = new ManualResetEvent(false);
                    numberofTasks = 4;
                    for (int i = 0; i <= 3; i++)
                    {
                        CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                        caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
                    }
                    signal.WaitOne();
                }
        
            }
        
            private void NumberCallback(IAsyncResult result)
            {
                AsyncResult asyncResult = (AsyncResult)result;
        
                CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;
        
                int num = caller.EndInvoke(asyncResult);
        
                Console.WriteLine("Number is :"+ num.ToString());
        
                ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
                if (Interlocked.Decrement(ref numberofTasks) == 0)
                {
                    mre.Set();
                }
            }
        
        }
        public class CalHandler
        {
            public delegate int count(int number);
        
            public int messageHandler ( int number )
            {
                return number;
            }
        
        }
        

        【讨论】:

        • 请忽略命名约定,它只是快速代码。
        【解决方案6】:
        protected void WaitAllExt(WaitHandle[] waitHandles)
        {
            //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
            const int waitAllArrayLimit = 64;
            var prevEndInd = -1;
            while (prevEndInd < waitHandles.Length - 1)
            {
                var stInd = prevEndInd + 1;
                var eInd = stInd + waitAllArrayLimit - 1;
                if (eInd > waitHandles.Length - 1)
                {
                    eInd = waitHandles.Length - 1;
                }
                prevEndInd = eInd;
        
                //do wait
                var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
                WaitHandle.WaitAll(whSubarray);
            }
        
        }
        

        【讨论】:

          【解决方案7】:

          我确实通过简单地对要等待的事件数量进行分页来解决它,而不会损失太多性能,并且它在生产环境中运行良好。遵循代码:

                  var events = new List<ManualResetEvent>();
          
                  // code omited
          
                  var newEvent = new ManualResetEvent(false);
                  events.Add(newEvent);
                  ThreadPool.QueueUserWorkItem(c => {
          
                      //thread code
                      newEvent.Set();
                  });
          
                  // code omited
          
                  var wait = true;
                  while (wait)
                  {
                      WaitHandle.WaitAll(events.Take(60).ToArray());
                      events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
                      wait = events.Any();
          
                  }
          

          【讨论】:

            【解决方案8】:

            这是另一种解决方案。这里的“事件”是ManualResetEvent的列表。列表的大小可以大于 64 (MAX_EVENTS_NO)。

            int len = events.Count;
            if (len <= MAX_EVENTS_NO)
                {
                    WaitHandle.WaitAll(events.ToArray());
                } else {
                    int start = 0;
                    int num = MAX_EVENTS_NO;
                    while (true)
                        {
                            if(start + num > len)
                            {
                               num = len - start;
                            }
                            List<ManualResetEvent> sublist = events.GetRange(start, num);
                            WaitHandle.WaitAll(sublist.ToArray());
                            start += num;
                            if (start >= len)
                               break;
                       }
               }
            
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2016-09-03
              • 2016-08-31
              • 1970-01-01
              • 1970-01-01
              • 2018-06-25
              相关资源
              最近更新 更多