【问题标题】:How to make boost::thread_group execute a fixed number of parallel threads如何让 boost::thread_group 执行固定数量的并行线程
【发布时间】:2011-03-21 14:49:11
【问题描述】:

这是创建thread_group并并行执行所有线程的代码:

boost::thread_group group;
for (int i = 0; i < 15; ++i)
    group.create_thread(aFunctionToExecute);
group.join_all();

此代码将一次执行所有线程。我想要做的是将它们全部并行执行,但最多并行执行 4 个。当 on 终止时,会执行另一个,直到没有更多可以执行。

【问题讨论】:

    标签: c++ multithreading boost boost-thread


    【解决方案1】:

    另一个更有效的解决方案是让每个线程在完成后回调到主线程,并且主线程上的处理程序每​​次都可以启动一个新线程。这可以防止对 timed_join 的重复调用,因为在触发回调之前主线程不会做任何事情。

    【讨论】:

    • 最后得到这样的结果:我有一个线程池,我在其中注册所有作业。然后,我创建 n 个线程并将线程池作为参数传递给每个线程。每个线程检查是否还有作业。如果是,只需执行一项工作即可。否则,线程结束。这样,我们只创建 n 个线程,而不是每个作业一个线程(作业结束,创建一个新线程)。
    【解决方案2】:

    我有这样的事情:

        boost::mutex mutex_;
        boost::condition_variable condition_;
        const size_t throttle_;
        size_t size_;
        bool wait_;
        template <typename Env, class F>
        void eval_(const Env &env, const F &f) {
            {   
                boost::unique_lock<boost::mutex> lock(mutex_);
                size_ = std::min(size_+1, throttle_);
                while (throttle_ <= size_) condition_.wait(lock);
            }
            f.eval(env);
            {
                boost::lock_guard<boost::mutex> lock(mutex_);
                --size_; 
            }
            condition_.notify_one();
        }
    

    【讨论】:

      【解决方案3】:

      我认为您正在寻找thread_pool 实现,它可以在here 获得。

      另外我注意到,如果您创建一个 std::future 向量并在其中存储许多 std::async_tasks 的未来,并且您在传递给线程的函数中没有任何阻塞代码,VS2013(至少从什么我可以确认)将准确启动您的机器可以处理的适当线程数。它会重用创建后的线程。

      【讨论】:

        【解决方案4】:

        我创建了自己的简化界面boost::thread_group 来完成这项工作:

        class ThreadGroup : public boost::noncopyable
        {
            private:
                boost::thread_group        group;
                std::size_t                maxSize;
                float                      sleepStart;
                float                      sleepCoef;
                float                      sleepMax;
                std::set<boost::thread*>   running;
        
            public:
                ThreadGroup(std::size_t max_size = 0,
                            float max_sleeping_time = 1.0f,
                            float sleeping_time_coef = 1.5f,
                            float sleeping_time_start = 0.001f) :
                    boost::noncopyable(),
                    group(),
                    maxSize(max_size),
                    sleepStart(sleeping_time_start),
                    sleepCoef(sleeping_time_coef),
                    sleepMax(max_sleeping_time),
                    running()
                {
                    if(max_size == 0)
                        this->maxSize = (std::size_t)std::max(boost::thread::hardware_concurrency(), 1u);
                    assert(max_sleeping_time >= sleeping_time_start);
                    assert(sleeping_time_start > 0.0f);
                    assert(sleeping_time_coef > 1.0f);
                }
        
                ~ThreadGroup()
                {
                    this->joinAll();
                }
        
                template<typename F> boost::thread* createThread(F f)
                {
                    float sleeping_time = this->sleepStart;
                    while(this->running.size() >= this->maxSize)
                    {
                        for(std::set<boost::thread*>::iterator it = running.begin(); it != running.end();)
                        {
                            const std::set<boost::thread*>::iterator jt = it++;
                            if((*jt)->timed_join(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time))))
                                running.erase(jt);
                        }
                        if(sleeping_time < this->sleepMax)
                        {
                            sleeping_time *= this->sleepCoef;
                            if(sleeping_time > this->sleepMax)
                                sleeping_time = this->sleepMax;
                        }
                    }
                    return *this->running.insert(this->group.create_thread(f)).first;
                }
        
                void joinAll()
                {
                    this->group.join_all();
                }
        
                void interruptAll()
                {
        #ifdef BOOST_THREAD_PROVIDES_INTERRUPTIONS
                    this->group.interrupt_all();
        #endif
                }
        
                std::size_t size() const
                {
                    return this->group.size();
                }
            };
        

        这里是一个使用示例,与boost::thread_group非常相似,主要区别在于线程的创建是一个等待点:

        {
          ThreadGroup group(4);
          for(int i = 0; i < 15; ++i)
            group.createThread(aFunctionToExecute);
        } // join all at destruction
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-11-05
          • 1970-01-01
          • 2016-06-06
          • 1970-01-01
          相关资源
          最近更新 更多