【问题标题】:C# MultithreadingC# 多线程
【发布时间】:2011-05-28 21:25:27
【问题描述】:

假设我有一个事件每秒触发 10 次

void Session_OnEvent(object sender, CustomEventArgs e)
{
  //DoStuff

  DoLongOperation(e);
}

我想要方法 DoLongOperation(e);每次触发事件时在单独的线程上进行处理,

我可以这样做:

new Thread(DoLongOperation).Start(e);

但是我觉得这对性能不好,我想达到最好的性能,那么我能做的最好的事情是什么?

感谢 idvance..

编辑:当我说很长时间时,我并不是说最多需要超过 1 秒的操作,只是我不希望事件等待那个时间,所以我想在单独的线程中进行......

【问题讨论】:

  • 你能做的最好的事情:测量。
  • 你的意思是要取消之前的LongOperation。或者您想对并行 LongOperation 设置上限?
  • 使用线程会使现代内核上可用的 CPU 周期数翻倍,这对性能有好处。但是程序中未诊断的错误数量会增加四倍,这对发布日期不利。

标签: c# multithreading


【解决方案1】:

是的,您可以这样做。但是当您的事件每秒触发 10 次并且您每秒启动 10 次长时间运行的操作时,您将很快耗尽线程。

【讨论】:

    【解决方案2】:

    您的问题的直接答案是:使用the managed thread pool 通过利用ThreadPool.QueueUserWorkItem 将您的操作推送到它。 (您可能想看看问题“when do I use the thread pool vs. my own threads?”的答案)。

    但是,放眼大局:如果您开始的所有操作都需要超过 100 毫秒才能完成,那么从数学上讲,您将产生比您可以处理的更多的工作。无论您如何切片,这都不会很好地结束。例如,如果您每次都创建一个单独的线程,那么您的进程将用完线程,如果您使用线程池,那么您将淹没它永远无法完成的工作,等等。

    如果只有一些您的操作最终会很长,并且大部分操作会立即完成,那么您可能有机会找到一个实用的解决方案。否则,您需要重新考虑您的程序设计。

    【讨论】:

    • 我不同意。对于每秒触发多次的事件,使用线程池管理线程将导致显着减速,因为线程池管理线程计数和创建新线程的方式。我认为在这种情况下,管理自己的线程会更好,但是我会对工作线程的数量设置上限,也许使用互锁系统来跟踪线程数。
    • 如果没有更多信息,您怎么知道它会导致“显着放缓”?我们甚至不知道什么对 OP 具​​有重要意义。此外,盲目地限制线程数量(或盲目地做任何事情,就此而言)肯定不会有好的结果。正如我所说,OP 很可能需要重新考虑他们的设计。
    【解决方案3】:

    性能在很大程度上取决于几个因素:

    • 一次将运行多少个线程?
    • 他们会做什么?
    • 它们将运行多长时间? (最短执行时间、最长、平均)
    • 如果其中一个线程异常终止会发生什么?

    每秒十次是相当高的活动率。根据执行的持续时间,使用单独的进程(如服务)可能更有意义。该活动显然必须是线程安全的,这意味着(部分)没有资源争用。如果两个线程可能需要更新相同的资源(文件、内存位置),则需要使用锁定。如果处理不当,这可能会影响效率。

    【讨论】:

      【解决方案4】:

      使用一个线程来处理您的请求,并从您的事件中为线程排队工作项。

      具体来说:

      • 复制电子版
      • 创建 List 并将副本插入到末尾
      • 从线程和事件同步访问该列表

      作为类的成员对象,这样做:

      List< CustomEventArgs > _argsqueue;
      Thread _processor;
      

      在类的构造函数中,做:

      _argsqueue=new List< CustomEventArgs >();
      _processor=new Thread(ProcessorMethod);
      

      定义处理器方法:

      void ProcessorMethod()
      {
           while (_shouldEnd)
           {
               CustomEventArgs e=null;
               lock (_argsqueue)
               {
                   if (_argsqueue.Count>0)
                   {
                       CustomEventArgs e=_argsqueue[0];
                       _argsqueue.RemoveAt(0);
                   }
               }
               if (e!=null) 
               {
                   DoLongOperation(e);
               }
               else
               {
                   Sleep(100);
               }
           }
      }
      

      在你的活动中:

      lock (_argsqueue)
      {
          _argsqueue.Add(e.Clone());
      }
      

      您必须自己制定细节,例如,在关闭表单或处理有问题的对象时,您必须:

      _shouldEnd=true;
      _processor.Join();
      

      【讨论】:

      • 这里,非常简单的例子。 :)
      【解决方案5】:

      如果您使用的是 C# 4.0,您可能需要考虑使用 task scheduler。由于您的 DoLongOperation 意味着它将长期运行,您应该考虑以下事项

      长时间运行的任务

      您可能希望明确阻止 任务被放入本地队列。 例如,您可能知道一个 特定的工作项将运行一个 时间相对较长,可能 阻止所有其他工作项 本地队列。在这种情况下,您可以 指定 LongRunning 选项,该选项 向调度程序提供一个提示 可能需要一个额外的线程 任务,使其不会阻塞 其他线程的前进进度 或本地队列上的工作项。经过 使用此选项,您可以避免 ThreadPool 完全包括 全局和本地队列。

      使用TaskScheduler 的另一个好处是它具有MaximumConcurrencyLevel。这使您可以在完成 Jon 推荐的测试后相对轻松地调整并发性。

      这是来自MSDN 的示例

      namespace System.Threading.Tasks.Schedulers
      {
      
          using System;
          using System.Collections.Generic;
          using System.Linq;
          using System.Threading;
      
          class Program
          {
              static void Main()
              {
                  LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
                  TaskFactory factory = new TaskFactory(lcts);
      
                  factory.StartNew(()=> 
                      {
                          for (int i = 0; i < 500; i++)
                          {
                              Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                          }
                      }
                  );
      
                  Console.ReadKey();
              }
          }
      
          /// <summary>
          /// Provides a task scheduler that ensures a maximum concurrency level while
          /// running on top of the ThreadPool.
          /// </summary>
          public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
          {
              /// <summary>Whether the current thread is processing work items.</summary>
              [ThreadStatic]
              private static bool _currentThreadIsProcessingItems;
              /// <summary>The list of tasks to be executed.</summary>
              private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
              /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
              private readonly int _maxDegreeOfParallelism;
              /// <summary>Whether the scheduler is currently processing work items.</summary>
              private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
      
              /// <summary>
              /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
              /// specified degree of parallelism.
              /// </summary>
              /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
              public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
              {
                  if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
                  _maxDegreeOfParallelism = maxDegreeOfParallelism;
              }
      
              /// <summary>Queues a task to the scheduler.</summary>
              /// <param name="task">The task to be queued.</param>
              protected sealed override void QueueTask(Task task)
              {
                  // Add the task to the list of tasks to be processed.  If there aren't enough
                  // delegates currently queued or running to process tasks, schedule another.
                  lock (_tasks)
                  {
                      _tasks.AddLast(task);
                      if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                      {
                          ++_delegatesQueuedOrRunning;
                          NotifyThreadPoolOfPendingWork();
                      }
                  }
              }
      
              /// <summary>
              /// Informs the ThreadPool that there's work to be executed for this scheduler.
              /// </summary>
              private void NotifyThreadPoolOfPendingWork()
              {
                  ThreadPool.UnsafeQueueUserWorkItem(_ =>
                  {
                      // Note that the current thread is now processing work items.
                      // This is necessary to enable inlining of tasks into this thread.
                      _currentThreadIsProcessingItems = true;
                      try
                      {
                          // Process all available items in the queue.
                          while (true)
                          {
                              Task item;
                              lock (_tasks)
                              {
                                  // When there are no more items to be processed,
                                  // note that we're done processing, and get out.
                                  if (_tasks.Count == 0)
                                  {
                                      --_delegatesQueuedOrRunning;
                                      break;
                                  }
      
                                  // Get the next item from the queue
                                  item = _tasks.First.Value;
                                  _tasks.RemoveFirst();
                              }
      
                              // Execute the task we pulled out of the queue
                              base.TryExecuteTask(item);
                          }
                      }
                      // We're done processing items on the current thread
                      finally { _currentThreadIsProcessingItems = false; }
                  }, null);
              }
      
              /// <summary>Attempts to execute the specified task on the current thread.</summary>
              /// <param name="task">The task to be executed.</param>
              /// <param name="taskWasPreviouslyQueued"></param>
              /// <returns>Whether the task could be executed on the current thread.</returns>
              protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
              {
                  // If this thread isn't already processing a task, we don't support inlining
                  if (!_currentThreadIsProcessingItems) return false;
      
                  // If the task was previously queued, remove it from the queue
                  if (taskWasPreviouslyQueued) TryDequeue(task);
      
                  // Try to run the task.
                  return base.TryExecuteTask(task);
              }
      
              /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
              /// <param name="task">The task to be removed.</param>
              /// <returns>Whether the task could be found and removed.</returns>
              protected sealed override bool TryDequeue(Task task)
              {
                  lock (_tasks) return _tasks.Remove(task);
              }
      
              /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
              public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
      
              /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
              /// <returns>An enumerable of the tasks currently scheduled.</returns>
              protected sealed override IEnumerable<Task> GetScheduledTasks()
              {
                  bool lockTaken = false;
                  try
                  {
                      Monitor.TryEnter(_tasks, ref lockTaken);
                      if (lockTaken) return _tasks.ToArray();
                      else throw new NotSupportedException();
                  }
                  finally
                  {
                      if (lockTaken) Monitor.Exit(_tasks);
                  }
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多