一. 条件变量
(一)条件变量概述
多线程访问一个共享资源(或称临界区),不仅需要用互斥锁实现独享访问避免并发错误,在获得互斥锁进入临界区后,还需检查特定条件是否成立。当某个线程修改测试条件后,将通知其它正在等待条件的线程继续往下执行。
1. wait线程:如果不满足该条件,拥有条件变量的wait线程会先释放该互斥锁,并把自身放入条件变量内部的等待队列,阻塞等待条件成立。
2. notify线程:在wait线程阻塞期间,notify线程获取互斥锁并进入临界区内访问共享资源,然后改变测试条件,当条件满足时通知在条件变量上等待的wait线程。wait线程重新申请对该互斥锁加锁,确认条件成立后则进行后续的任务处理,否则继续等待。
(二)std::condition_variable
class condition_variable { // class for waiting for conditions public: using native_handle_type = _Cnd_t; condition_variable() { // 默认构造函数 _Cnd_init_in_situ(_Mycnd()); } ~condition_variable() noexcept { // 析构函数 _Cnd_destroy_in_situ(_Mycnd()); } //不可复制和移动(如果重写移动,因此也就不可移动) condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete; void notify_one() noexcept { // 唤醒一个等待线程 _Check_C_return(_Cnd_signal(_Mycnd())); } void notify_all() noexcept { // 唤醒所有的等待线程 _Check_C_return(_Cnd_broadcast(_Mycnd())); } void wait(unique_lock<mutex>& _Lck) { // 等待,直到被唤醒 // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx())); } template <class _Predicate> void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待信号并测试条件 while (!_Pred()) { //判断测试条件,只有当Pred不成立时才阻塞 wait(_Lck); } } //等待,直到被唤醒或者超时 template <class _Rep, class _Period> cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) { if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) { return cv_status::timeout; } // The standard says that we should use a steady clock, but unfortunately our ABI // speaks struct xtime, which is relative to the system clock. _CSTD xtime _Tgt; const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Rel_time); const cv_status _Result = wait_until(_Lck, &_Tgt); if (_Clamped) { return cv_status::no_timeout; } return _Result; } //带超时等待并检测条件 template <class _Rep, class _Period, class _Predicate> bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) { // wait for signal with timeout and check predicate return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred); } //等待,直到被唤醒或者到达某个时间点 template <class _Clock, class _Duration> cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) { // 等待到指定的时间点 for (;;) { const auto _Now = _Clock::now(); if (_Abs_time <= _Now) { return cv_status::timeout; } _CSTD xtime _Tgt; (void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now); const cv_status _Result = wait_until(_Lck, &_Tgt); if (_Result == cv_status::no_timeout) { return cv_status::no_timeout; } } } //阻塞等待,直到信号或指定时长到达,同时检测条件 template <class _Clock, class _Duration, class _Predicate> bool wait_until( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) { //等待带有超时的信号并检测条件 return _Wait_until1(_Lck, _Abs_time, _Pred); } cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) { // wait for signal with timeout if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) { _Throw_Cpp_error(_OPERATION_NOT_PERMITTED); } // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time); switch (_Res) { case _Thrd_success: return cv_status::no_timeout; case _Thrd_timedout: return cv_status::timeout; default: _Throw_C_error(_Res); } } template <class _Predicate> bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) { // wait for signal with timeout and check predicate return _Wait_until1(_Lck, _Abs_time, _Pred); } //返回原生句柄 _NODISCARD native_handle_type native_handle() { // return condition variable handle return _Mycnd(); } void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit _Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready); } void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit _Cnd_unregister_at_thread_exit(_Mtx._Mymtx()); } private: aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage; _Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage return reinterpret_cast<_Cnd_t>(&_Cnd_storage); } template <class _Predicate> bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) { // wait for signal with timeout and check predicate while (!_Pred()) { if (wait_until(_Lck, _Abs_time) == cv_status::timeout) { return _Pred(); } } return true; } template <class _Clock, class _Duration, class _Predicate> bool _Wait_until1( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) { while (!_Pred()) { const auto _Now = _Clock::now(); if (_Abs_time <= _Now) { return false; } _CSTD xtime _Tgt; const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now); if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) { return _Pred(); } } return true; } };