【问题标题】:What is wrong with my custom thread pool?我的自定义线程池有什么问题?
【发布时间】:2009-01-19 13:24:54
【问题描述】:

我创建了一个自定义线程池实用程序,但似乎有一个我找不到的问题。

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}

}

问题来了:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);

标题中总是有一个线程ID。并且程序似乎在一个线程中运行。

示例用法:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);

P.S.:这很混乱,但我仍在努力。

【问题讨论】:

  • 为什么不直接使用 .NET 中内置的线程池类?

标签: c# .net multithreading lambda


【解决方案1】:

我查看了您的代码,发现了几个问题。

  1. 您锁定队列对象,即使它是同步队列。这是不必要的
  2. 您不一致地锁定队列对象。它应该为每次访问锁定或不锁定,具体取决于同步行为。
  3. Proceed 方法不是线程安全的。这两行是问题

    if (queue.Count > 0) { var item = (T)queue.Dequeue(); ... }

    使用同步队列只能保证个人访问是安全的。所以 .Count 和 .Dequeue 方法都不会弄乱队列的内部结构。然而,想象一下两个线程同时运行这些代码行的场景,队列的计数为 1

    • 线程 1:如果 (...) -> 真
    • 线程 2:如果 (...) -> 真
    • 线程 1:出队 -> 成功
    • Thread2: dequeue -> 因为队列为空而失败
  4. Worker 和 Proceed 之间存在可能导致死锁的竞争条件。下面两行代码要切换一下。

    代码:

    res_thr.Set() Interlocked.Decrement(ref Num_Of_Threads);

    第一行将解除对 Worker 方法的阻塞。如果它运行得足够快,它将返回查看,注意 Num_Of_Threads

  5. maxThread 计数属性似乎没有超过 4 个有用。sem 对象被初始化为只接受最多 4 个并发条目。实际执行一个项目的所有代码都必须经过这个信号量。因此,无论 maxThread 设置多高,您都有效地将最大并发项目数限制为 4。

【讨论】:

    【解决方案2】:

    编写健壮的线程代码并非易事。有许多线程池可供您参考,但也请注意并行扩展(以 CTP 或更高版本在 .NET 4.0 中提供)包括许多开箱即用的附加线程构造(在 TPL/CCR 中)。例如,Parallel.For / Parallel.ForEach,它们处理工作窃取,并有效地处理可用内核。

    有关预滚动线程池的示例,请参阅 Jon Skeet 的 CustomThreadPool here

    【讨论】:

    • 感谢有用的链接和导航^_^
    【解决方案3】:

    我认为你可以很简单的事情。

    这是我使用的线程池的修改形式(我没有测试修改):

    唯一的同步。您需要的原语是一个监视器,锁定在线程池上。您不需要信号量或重置事件。

    internal class ThreadPool
    {
        private readonly Thread[] m_threads;
        private readonly Queue<Action> m_queue;
        private bool m_shutdown;
        private object m_lockObj;
    
    
        public ThreadPool(int numberOfThreads)
        {
            Util.Assume(numberOfThreads > 0, "Invalid thread count!");
            m_queue = new Queue<Action>();
            m_threads = new Thread[numberOfThreads];
            m_lockObj = new object();
    
            lock (m_lockObj)
            {
                for (int i = 0; i < numberOfWriteThreads; ++i)
                {
                    m_threads[i] = new Thread(ThreadLoop);
                    m_threads[i].Start();
                }
            }
    
        }
    
        public void Shutdown()
        {
            lock (m_lockObj)
            {
                m_shutdown = true;
                Monitor.PulseAll(m_lockObj);
    
                if (OnShuttingDown != null)
                {
                    OnShuttingDown();
                }
            }
            foreach (var thread in m_threads)
            {
                thread.Join();
            }
        }
        public void Enqueue(Action a)
        {
            lock (m_lockObj)
            {
                m_queue.Enqueue(a);
                Monitor.Pulse(m_lockObj);
            }
        }
    
        private void ThreadLoop()
        {
            Monitor.Enter(m_lockObj);
    
            while (!m_shutdown)
            {
                if (m_queue.Count == 0)
                {
                    Monitor.Wait(m_lockObj);
                }
                else
                {
                    var a = m_queue.Dequeue();
                    Monitor.Pulse(m_lockObj);
                    Monitor.Exit(m_lockObj);
                    try
                    {
                        a();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null);
                    }
                    Monitor.Enter(m_lockObj);
                }
            }
    
            Monitor.Exit(m_lockObj);
        }
    }
    

    【讨论】:

    • 永远不要锁定“这个”:)。否则我可以通过外部锁定来搞乱你的算法。是的,这样做是非常糟糕的做法,但是您看到人们锁定实例对象有多少次?
    • 我明白你的意思。您也可以将被锁定的对象更改为内部字段,这将得到修复。对我来说,这不是问题,因为我控制了线程池的所有使用。对于一般代码或 700K LOC 项目,我可以看出这是个问题。我会更新帖子。
    • 存在一个潜在的写入顺序问题。即使 m_shutdown 在 Monitor.PulseAll() 运行之前设置,也不能保证此写入将在所有线程上可见。因此,这可能会在关闭期间导致死锁,因为 ThreadLoop 会直接返回等待。
    • 我在这一点上不是 100% 确定的。不知道 PulseAll() 是否引入了内存屏障。
    • 是的。你说的对。正确的行为是在那里使用 Interlocked.Exchange
    【解决方案4】:

    您可能应该使用内置的线程池。运行您的代码时,我注意到您启动了一堆线程,但由于队列计数

    【讨论】:

    • 内置线程池如果不是静态类就好了,可以做多个线程池对象。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-27
    相关资源
    最近更新 更多