这是mass_thread_pool:
// launches n threads all doing task F with an index:
template<class F>
struct mass_thread_pool {
F f;
std::vector< std::thread > threads;
std::condition_variable cv;
std::mutex m;
size_t task_id = 0;
size_t finished_count = 0;
std::unique_ptr<std::promise<void>> task_done;
std::atomic<bool> finished;
void task( F f, size_t n, size_t cur_task ) {
//std::cout << "Thread " << n << " launched" << std::endl;
do {
f(n);
std::unique_lock<std::mutex> lock(m);
if (finished)
break;
++finished_count;
if (finished_count == threads.size())
{
//std::cout << "task set finished" << std::endl;
task_done->set_value();
finished_count = 0;
}
cv.wait(lock,[&]{if (finished) return true; if (cur_task == task_id) return false; cur_task=task_id; return true;});
} while(!finished);
//std::cout << finished << std::endl;
//std::cout << "Thread " << n << " finished" << std::endl;
}
mass_thread_pool() = delete;
mass_thread_pool(F fin):f(fin),finished(false) {}
mass_thread_pool(mass_thread_pool&&)=delete; // address is party of identity
std::future<void> kick( size_t n ) {
//std::cout << "kicking " << n << " threads off. Prior count is " << threads.size() << std::endl;
std::future<void> r;
{
std::unique_lock<std::mutex> lock(m);
++task_id;
task_done.reset( new std::promise<void>() );
finished_count = 0;
r = task_done->get_future();
while (threads.size() < n) {
size_t i = threads.size();
threads.emplace_back( &mass_thread_pool::task, this, f, i, task_id );
}
//std::cout << "count is now " << threads.size() << std::endl;
}
cv.notify_all();
return r;
}
~mass_thread_pool() {
//std::cout << "destroying thread pool" << std::endl;
finished = true;
cv.notify_all();
for (auto&& t:threads) {
//std::cout << "joining thread" << std::endl;
t.join();
}
//std::cout << "destroyed thread pool" << std::endl;
}
};
你用一个任务构建它,然后你 kick(77) 启动该任务的 77 个副本(每个副本都有不同的索引)。
kick 返回一个std::future<void>。您必须等待所有任务完成。
然后你可以要么销毁线程池,要么再次调用kick(77)重新启动任务。
这个想法是,您传递给mass_thread_pool 的函数对象可以访问您的输入和输出数据(例如,您要相乘的矩阵或指向它们的指针)。每个kick 都会导致它为每个索引调用一次您的函数。您负责将索引转换为任何偏移量。
Live example 我用它来为另一个vector 中的条目加1。在迭代之间,我们交换向量。这会进行 2000 次迭代,并启动 10 个线程,并调用 lambda 20000 次。
注意auto&& pool = make_pool( lambda ) 位。需要使用auto&&——因为线程池有指向自身的指针,所以我在大量线程池上禁用了移动和复制构造。如果您确实需要传递它,请创建一个指向线程池的唯一指针。
我在std::promise 重置时遇到了一些问题,所以我将它包装在一个unique_ptr 中。这可能不是必需的。
我用来调试它的跟踪语句被注释掉了。
用不同的n 调用kick 可能会也可能不会。绝对用较小的n 调用它不会像您期望的那样工作(在这种情况下它将忽略n)。
在您致电kick 之前,不会进行任何处理。 kick 是“开球”的缩写。
...
对于您的问题,我要做的是创建一个拥有mass_thread_pool 的多重对象。
乘数有一个指向 3 个矩阵的指针(a、b 和 out)。 n 个子任务中的每一个都会生成out 的一些小节。
您将 2 个矩阵传递给乘法器,它将指向 out 的指针设置为本地矩阵,并将 a 和 b 设置为传入的矩阵,执行 kick,然后等待,然后返回局部矩阵。
对于幂,您使用上面的乘数来构建一个二的幂的塔,同时根据指数的位进行乘法累加到您的结果中(再次使用上面的乘数)。
上述更高级的版本可以允许乘法和std::future<Matrix>s(以及未来矩阵的乘法)排队。