【发布时间】:2020-08-31 12:30:12
【问题描述】:
我一直在试图弄清楚std::condition_variables,我对wait() 以及是使用notify_all 还是notify_one 感到特别困惑。
首先,我编写了一些代码并将其附在下面。这里有一个简短的解释:Collection 是一个持有一堆Counter 对象的类。这些Counter 对象有一个Counter::increment() 方法,需要在所有对象上一遍又一遍地调用它。为了加快速度,Collection 还维护了一个线程池来分配工作,并使用其Collection::increment_all() 方法发送所有工作。
这些线程不需要相互通信,而且Counter 对象的数量通常比线程的数量多得多。如果一个线程处理的比其他线程多于Counters,这很好,只要所有工作都完成即可。向队列添加工作很容易,只需要在“主”线程中完成。据我所知,唯一可能发生的坏事是,如果允许在正在完成的工作中调用计数器上的其他方法(例如Collection::printCounts)。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() {
m_count ++;
}
int getCount() const { return m_count; }
};
class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters)
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this));
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection()
{
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "\n";
}
void increment_all()
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}
private:
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}
}
std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}
我在 Ubuntu 18.04 上使用 g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp 编译此代码,我认为代码实现了所有这些目标,但仍然存在一些问题:
我正在使用
m_work_complete.wait(lk)确保在开始打印所有新计数之前完成工作。 为什么我有时会看到这个写在while循环中,或者带有第二个参数作为 lambda 谓词函数? These docs 提到虚假唤醒。如果发生虚假唤醒,这是否意味着printCounts可能会过早打印?如果是这样,我不想那样。我只想确保工作队列是空的,然后再开始使用应该存在的数字。我使用的是
m_work_complete.notify_all而不是m_work_complete.notify_one。我读过this thread,我认为这并不重要——只有主线程会被这个阻塞。 使用notify_one是否更快,这样其他线程就不必担心了?
【问题讨论】:
标签: c++ multithreading mutex race-condition stdthread