【发布时间】:2020-05-26 23:35:07
【问题描述】:
我有一种情况,一个线程需要偶尔唤醒多个工作线程,每个工作线程需要(仅)完成一次工作,然后返回睡眠状态等待下一个通知。我正在使用 condition_variable 唤醒一切,但我遇到的问题是“只有一次”部分。假设每个线程的创建都很繁重,所以我不想每次都创建和加入它们。
// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>
std::mutex condMutex;
std::condition_variable condVar;
bool dataReady = false;
void state_change_worker(int id)
{
while (1)
{
{
std::unique_lock<std::mutex> lck(condMutex);
condVar.wait(lck, [] { return dataReady; });
// Do work only once.
std::cout << "thread " << id << " working\n";
}
}
}
int main()
{
// Create some worker threads.
std::thread threads[5];
for (int i = 0; i < 5; ++i)
threads[i] = std::thread(state_change_worker, i);
while (1)
{
// Signal to the worker threads to work.
{
std::cout << "Notifying threads.\n";
std::unique_lock<std::mutex> lck(condMutex);
dataReady = true;
condVar.notify_all();
}
// It would be really great if I could wait() on all of the
// worker threads being done with their work here, but it's
// not strictly necessary.
std::cout << "Sleep for a bit.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
更新:这是一个实现了一个几乎但不完全有效的小队锁版本。问题是我不能保证每个线程在再次运行之前都有机会唤醒并在 waitForLeader() 中减少计数。
// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>
class SquadLock
{
public:
void waitForLeader()
{
{
// Increment count to show that we are waiting in queue.
// Also, if we are the thread that reached the target, signal
// to the leader that everything is ready.
std::unique_lock<std::mutex> count_lock(count_mutex_);
std::unique_lock<std::mutex> target_lock(target_mutex_);
if (++count_ >= target_)
count_cond_.notify_one();
}
// Wait for leader to signal done.
std::unique_lock<std::mutex> lck(done_mutex_);
done_cond_.wait(lck, [&] { return done_; });
{
// Decrement count to show that we are no longer waiting.
// If we are the last thread set done to false.
std::unique_lock<std::mutex> lck(count_mutex_);
if (--count_ == 0)
{
done_ = false;
}
}
}
void waitForHerd()
{
std::unique_lock<std::mutex> lck(count_mutex_);
count_cond_.wait(lck, [&] { return count_ >= target_; });
}
void leaderDone()
{
std::unique_lock<std::mutex> lck(done_mutex_);
done_ = true;
done_cond_.notify_all();
}
void incrementTarget()
{
std::unique_lock<std::mutex> lck(target_mutex_);
++target_;
}
void decrementTarget()
{
std::unique_lock<std::mutex> lck(target_mutex_);
--target_;
}
void setTarget(int target)
{
std::unique_lock<std::mutex> lck(target_mutex_);
target_ = target;
}
private:
// Condition variable to indicate that the leader is done.
std::mutex done_mutex_;
std::condition_variable done_cond_;
bool done_ = false;
// Count of currently waiting tasks.
std::mutex count_mutex_;
std::condition_variable count_cond_;
int count_ = 0;
// Target number of tasks ready for the leader.
std::mutex target_mutex_;
int target_ = 0;
};
SquadLock squad_lock;
std::mutex print_mutex;
void state_change_worker(int id)
{
while (1)
{
// Wait for the leader to signal that we are ready to work.
squad_lock.waitForLeader();
{
// Adding just a bit of sleep here makes it so that every thread wakes up, but that isn't the right way.
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lck(print_mutex);
std::cout << "thread " << id << " working\n";
}
}
}
int main()
{
// Create some worker threads and increment target for each one
// since we want to wait until all threads are finished.
std::thread threads[5];
for (int i = 0; i < 5; ++i)
{
squad_lock.incrementTarget();
threads[i] = std::thread(state_change_worker, i);
}
while (1)
{
// Signal to the worker threads to work.
std::cout << "Starting threads.\n";
squad_lock.leaderDone();
// Wait for the worked threads to be done.
squad_lock.waitForHerd();
// Wait until next time, processing results.
std::cout << "Tasks done, waiting for next time.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
【问题讨论】:
-
如果您通知所有线程并且从不重置
dataReady,而您希望您的代码按预期工作。在实践中,使用计数器而不是布尔值会更容易,因为当你有事要做时你会加一,而在工作时会减一。最后,在您的工作人员中,所有代码都在锁定中。
标签: c++ multithreading mutex condition-variable