【发布时间】:2017-08-01 15:41:04
【问题描述】:
我想仅使用 C++ 11 的标准库来实现线程池。我想公开的接口是允许我的主线程一次提交许多作业,并等到所有线程完成后再继续。这是我第一次明确地处理线程,所以不可避免地会遇到一些死锁问题。这是我的代码:
class CrashQueue {
private:
std::vector<std::thread> workers;
std::queue<void*> payloads;
std::function<void(void*)> function;
std::mutex taskFetchingMutex;
long aliveWorkers;
std::condition_variable alarmClock;
std::condition_variable sleepClock;
std::mutex sleepClockMutex;
bool running = true;
public:
CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency()) {
for (std::size_t i = 0; i < threadCount; i ++) {
workers.emplace_back([this]() -> void {
while (running) {
void* payload;
{
std::unique_lock<std::mutex> lock(taskFetchingMutex);
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
payload = payloads.front();
payloads.pop();
}
function(payload);
}
});
}
}
~CrashQueue() {
running = false;
alarmClock.notify_all();
for (auto& worker : workers)
worker.join();
}
void run() {
this->aliveWorkers = workers.size();
alarmClock.notify_all();
std::unique_lock<std::mutex> lock(sleepClockMutex);
sleepClock.wait(lock);
}
void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
this->function = std::move(function);
this->payloads = std::move(payloads);
}
};
我怀疑问题出在工作线程中执行的构造函数的 lambda 表达式中:
if (payloads.empty()) {
aliveWorkers --;
if (aliveWorkers == 0)
sleepClock.notify_one();
alarmClock.wait(lock);
continue;
}
可能是最后一个工作线程唤醒主线程并在主线程唤醒所有其他线程之后进入睡眠。尽管如此,这似乎不太可能,但每次我不处于调试模式时都会发生死锁。有什么提示吗?
【问题讨论】:
-
aliveWorkers必须是std::atomic<long>
标签: c++ multithreading