【发布时间】:2016-02-23 05:29:34
【问题描述】:
我有一个简单的线程对象,它负责一些执行(工作人员):
在最简单的形式中,为每个线程创建一个对象:
class worker
{
public:
worker (
boost::atomic<int> & threads,
boost::mutex & mutex,
boost::condition_variable & condition
)
: threads__(threads), mutex__(mutex), condition__(condition)
{}
void run (
// some params
)
{
// ... do the threaded work here
// finally, decrease number of running threads and notify
boost::mutex::scoped_lock lock(mutex__);
threads__--;
condition__.notify_one();
}
private:
boost::atomic<int> & threads__;
boost::mutex & mutex__;
boost::condition_variable & condition__;
};
我使用它的方式是在一个循环中运行最多 8 个并发线程,如果一个线程完成则等待通知,以便生成下一个线程:
boost::thread_group thread_group;
boost::mutex mutex;
boost::condition_variable condition;
boost::atomic<int> threads(0);
// Some loop which can be parallelised
for ( const auto & x : list )
{
// wait if thread_count exceeds 8 threads
boost::mutex::scoped_lock lock(mutex);
while ( threads >= 8 )
condition.wait( lock );
// Create worker object
worker _wrk_( threads, mutex, condition );
boost::thread * thread = new boost::thread( &worker::run, &_wrk_, /* other params */ );
thread_group.add_thread( thread );
threads++;
}
这适用于我的大多数场景,但现在我有一个需要重复使用的线程对象。
原因很简单:这个tread 对象包含thrust::device_vector<float>,当对象被删除时(重新)分配的代价很高。
此外,这些向量可以重复使用,因为它们的大部分内容不会改变。
因此,我正在寻找一种可以重用在循环中创建的对象的机制——事实上,我将预先分配 8 个这些对象(或与我的并发线程一样多),然后再使用它们一遍又一遍。 我希望可以做到的事情是这样的:
boost::thread_group thread_group;
boost::mutex mutex;
boost::condition_variable condition;
boost::atomic<int> threads(0);
// our worker objects to be reused
std::vector<std::shared_ptr<worker>>workers(8,std::make_shared<worker>(threads,mutex,condition));
// Some loop which can be parallelised
for ( const auto & x : list )
{
// wait if thread_count exceeds 8 threads
boost::mutex::scoped_lock lock(mutex);
while ( threads >= 8 )
condition.wait( lock );
// get next available thread object from the vector
auto _wrk_ = std::find_if(workers.begin(), workers.end(), is_available() );
// if we have less than 8 threads but no available thread object
if ( _wrk_ == workers.end() ) throw std::runtime_error ("...");
// Use the first available worker object for this thread
boost::thread * thread = new boost::thread(&worker::run, &(*_wrk_));
thread_group.add_thread( thread );
threads++;
}
我不知道如何向 is_available() 发出信号,除了将它实现为(工作类的)类方法。
其次,在我看来,这无缘无故太复杂了,我确信必须有某种其他模式可以使用,它更简单和/或优雅。
【问题讨论】:
-
为什么不使用已经可用的线程池?
-
@SergeyA 你的意思是一个提升线程组作为一个池?
-
我不熟悉 boost::thread_group,但从您的使用来看,您似乎没有将它用作线程池 - 您正在为每个请求创建一个新线程。我的意思是使用经典线程池,当您在开始时启动预定义数量的线程时,将它们连接到消息队列并将“工作”请求发布到该队列以由第一个可用线程拾取。
-
@SergeyA 我不知道该怎么做。你是对的,我正在创建新线程。每个线程的参数都会改变,但对象保持不变。这些对象复制起来非常昂贵(它们会进行主机设备内存复制)。你能提供一个简化的例子来说明你的意思吗?我应该使用这样的线程池吗:stackoverflow.com/questions/12215395/…
-
这对我来说看起来就像线程池。您可以谷歌线程池 - 这是一个非常广泛使用的概念,它应该非常适用于您的情况。
标签: c++ multithreading boost