【问题标题】:Boost group_threads Maximal number of parallel threadBoost group_threads 最大并行线程数
【发布时间】:2014-05-06 06:46:45
【问题描述】:

我想在我的程序中应用最大线程数的 boost group_thread。例如

int maxNumberOfThreads
boost::thread_group group;
 for (int i = 0; i < N; ++i)
      //create new if group.size() is smaller then maximal number of threads
      group.create_thread(Worker);
 group.join_all();

有人知道我如何实现这一点吗?

因为当我启动N个线程时它会非常低效。

感谢您的帮助

【问题讨论】:

    标签: c++ multithreading boost


    【解决方案1】:

    这是我的(不完美的)实现:

    /**
     * \author Christophe Dumeunier
     * \brief  Extension of boost::thread_group managing a maximum number of threads running in parallel
     */
    class thread_group_max : public boost::thread_group
    {
        public:
            /**
             * \brief  Instanciate a group for threads
             * \param  max_running_threads  Maximum number of threads running in parallel, if 0 use the number of cores
             * \param    max_sleeping_time  Maximum sleeping time (seconds) between two checks for finished threads (must be > sleeping_time_start)
             * \param   sleeping_time_grow  Coefficient increasing sleeping time while waiting for finished threads (must be > 1)
             * \param  sleeping_time_start  Initial sleeping time (must be > 0)
             */
            explicit                   thread_group_max(std::size_t max_running_threads = 0, float max_sleeping_time = 1.0f,
                                                        float sleeping_time_grow = 1.1f, float sleeping_time_start = 0.001f);
            /**
             * \brief  Destroy the group
             * \note   Doesn't join the unterminated threads
             */
                                       ~thread_group_max();
    
            /** \brief Wait for an available slot and then create a new thread and launch it */
            template<typename F>
            boost::thread*             create_thread(F f);
    
        private:
            std::size_t                maxRunningThreads;  //!< Maximum number of running threads
            float                      maxSleepingTime;    //!< Maximum sleeping time between two checks for finished threads
            float                      sleepingTimeStart;  //!< Initial sleeping time
            float                      sleepingTimeGrow;   //!< Coefficient increasing sleeping time while waiting for finished threads
            std::set<boost::thread*>   runningThreads;     //!< Pointers to running or finished-but-not-removed-yet threads
    };
    
    thread_group_max::thread_group_max(std::size_t max_running_threads, float max_sleeping_time, float sleeping_time_grow, float sleeping_time_start) :
        boost::thread_group(),
        maxRunningThreads(max_running_threads == 0 ? std::max(boost::thread::hardware_concurrency(), 1u) : max_running_threads),
        maxSleepingTime(max_sleeping_time),
        sleepingTimeStart(sleeping_time_start),
        sleepingTimeGrow(sleeping_time_grow),
        runningThreads()
    {
        assert(this->maxRunningThreads > 0);
        assert(this->maxSleepingTime >= this->sleepingTimeStart);
        assert(this->sleepingTimeStart > 0.0f);
        assert(this->sleepingTimeGrow > 1.0f);
    }
    
    thread_group_max::~thread_group_max()
    {}
    
    template<typename F>
    boost::thread* thread_group_max::create_thread(F f)
    {
        // First, try to clean already finished threads
        if(this->runningThreads.size() >= this->maxRunningThreads)
        {
            for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
            {
                const std::set<boost::thread*>::iterator jt = it++;
                if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                    this->runningThreads.erase(jt);
            }
        }
    
        // If no finished thread found, wait for it
        if(this->runningThreads.size() >= this->maxRunningThreads)
        {
            float sleeping_time = this->sleepingTimeStart;
            do
            {
                boost::this_thread::sleep(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)));
                for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
                {
                    const std::set<boost::thread*>::iterator jt = it++;
                    if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                        this->runningThreads.erase(jt);
                }
                if(sleeping_time < this->maxSleepingTime)
                {
                    sleeping_time *= this->sleepingTimeGrow;
                    if(sleeping_time > this->maxSleepingTime)
                        sleeping_time = this->maxSleepingTime;
                }
            } while(this->runningThreads.size() >= this->maxRunningThreads);
        }
    
        // Now, at least 1 slot is available, use it
        return *this->runningThreads.insert(this->boost::thread_group::create_thread(f)).first;
    }
    

    使用示例:

    thread_group_max group(num_threads);
    for(std::size_t i = 0; i < jobs.size(); ++i)
      group.create_thread(boost::bind(&my_run_job_function, boost::ref(job[i])));
    group.join_all();
    

    【讨论】:

      【解决方案2】:

      你似乎想要的是一个线程池。

      您可以使用boost::thread::hardware_concurrency() 来确定您的特定系统上可用的(逻辑)核心数量。

      这是我上周回答的一个问题:

      #include <boost/thread.hpp>
      #include <boost/phoenix.hpp>
      #include <boost/optional.hpp>
      
      using namespace boost;
      using namespace boost::phoenix::arg_names;
      
      boost::atomic_size_t counter(0ul);
      
      class thread_pool
      {
        private:
            mutex mx;
            condition_variable cv;
      
            typedef function<void()> job_t;
            std::deque<job_t> _queue;
      
            thread_group pool;
      
            boost::atomic_bool shutdown;
            static void worker_thread(thread_pool& q)
            {
                while (optional<job_t> job = q.dequeue())
                    (*job)();
            }
      
        public:
            thread_pool() : shutdown(false) {
                for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
                    pool.create_thread(bind(worker_thread, ref(*this)));
            }
      
            void enqueue(job_t job) 
            {
                lock_guard<mutex> lk(mx);
                _queue.push_back(job);
      
                cv.notify_one();
            }
      
            optional<job_t> dequeue() 
            {
                unique_lock<mutex> lk(mx);
                namespace phx = boost::phoenix;
      
                cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
      
                if (_queue.empty())
                    return none;
      
                job_t job = _queue.front();
                _queue.pop_front();
      
                return job;
            }
      
            ~thread_pool()
            {
                shutdown = true;
                {
                    lock_guard<mutex> lk(mx);
                    cv.notify_all();
                }
      
                pool.join_all();
            }
      };
      

      一个典型的使用方式也在那个答案中:

      static const size_t bignumber = 1 << 20;
      
      class myClass 
      {
          thread_pool pool; // uses 1 thread per core
      
        public:
          void launch_jobs()
          {
              std::cout << "enqueuing jobs... " << std::flush;
              for(size_t i=0; i<bignumber; ++i)
              {
                  for(int j=0; j<2; ++j) {
                      pool.enqueue(bind(&myClass::myFunction, this, j, i));
                  }     
              }
              std::cout << "done\n";
          }
      
        private:
          void myFunction(int i, int j)
          {
              boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
              counter += 1;
          }
      };
      
      int main()
      {
          myClass instance;
          instance.launch_jobs();
      
          size_t last = 0;
          while (counter < (2*bignumber))
          {
              boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
              if ((counter >> 4u) > last)
              {
                  std::cout << "Progress: " << counter << "/" << (bignumber*2) << "\n";
                  last = counter >> 4u;
              }
          }
      }
      

      作为奖励,在那个问题上,在 cmets 的另一个答案中,我还发布了一个基于 lock-free 作业队列实现的等效解决方案:

      【讨论】:

      • 感谢您的出色回答。我用线程资源错误阅读了这个问题,但我没有意识到这个线程池也是我的问题的解决方案。目前我正在尝试运行 thread_pool 类。但是我对 std::move 和 auto 有一些问题。这仅适用于 c++11 吗?
      • 是的。我现在已经为 c++03 更新了这个答案中的代码。 c++11 版本的效率可能稍高一些。干杯
      • 感谢您的帮助。应该可以在 thread_pool 类中添加函数 waitTillJobsAreDone 还是?在这个函数里面我只需要调用 pool.join_all();这应该阻止所有工作,直到他们完成或?
      • 好吧,我想我误解了这个架构。这个线程池一直在运行,只有在调用析构函数时才结束,或者?或者有可能关闭他?例如,我有 10 个工作要做,我想等待这个工作的结果。之后我想关闭我的线程池
      • 你用一种奇怪的方式来表达问题 :) 假设你想知道你是否可以做这些事情:你可以做你需要但你不能 按照您建议的方式进行。 1. 您可以等待队列为empty() 2. 析构函数已经完成了这一点。但正确 :) 请参阅 is_idle() and wait_idle() implementations here(已修复) - 请务必阅读(并思考!)cmets。设计线程感知接口并不是那么简单:)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-04-20
      • 1970-01-01
      • 2016-09-14
      • 2020-11-02
      相关资源
      最近更新 更多