一. 条件变量

(一)条件变量概述

  多线程访问一个共享资源(或称临界区),不仅需要用互斥锁实现独享访问避免并发错误,在获得互斥锁进入临界区后,还需检查特定条件是否成立。当某个线程修改测试条件后,将通知其它正在等待条件的线程继续往下执行。

  1. wait线程:如果不满足该条件,拥有条件变量的wait线程会先释放该互斥锁,并把自身放入条件变量内部的等待队列,阻塞等待条件成立

  2. notify线程:在wait线程阻塞期间,notify线程获取互斥锁并进入临界区内访问共享资源,然后改变测试条件,当条件满足时通知在条件变量上等待的wait线程。wait线程重新申请对该互斥锁加锁,确认条件成立后则进行后续的任务处理,否则继续等待。

(二)std::condition_variable

class condition_variable { // class for waiting for conditions
public:
    using native_handle_type = _Cnd_t;

    condition_variable() { // 默认构造函数
        _Cnd_init_in_situ(_Mycnd());
    }

    ~condition_variable() noexcept { // 析构函数
        _Cnd_destroy_in_situ(_Mycnd());
    }

    //不可复制和移动(如果重写移动,因此也就不可移动)
    condition_variable(const condition_variable&) = delete;
    condition_variable& operator=(const condition_variable&) = delete;

    void notify_one() noexcept { // 唤醒一个等待线程
        _Check_C_return(_Cnd_signal(_Mycnd()));
    }

    void notify_all() noexcept { // 唤醒所有的等待线程
        _Check_C_return(_Cnd_broadcast(_Mycnd()));
    }

    void wait(unique_lock<mutex>& _Lck) { // 等待,直到被唤醒
        // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow
        _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx()));
    }

    template <class _Predicate>
    void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待信号并测试条件
        while (!_Pred()) { //判断测试条件,只有当Pred不成立时才阻塞
            wait(_Lck);
        }
    }

    //等待,直到被唤醒或者超时
    template <class _Rep, class _Period>
    cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {
        if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) {
            return cv_status::timeout;
        }

        // The standard says that we should use a steady clock, but unfortunately our ABI
        // speaks struct xtime, which is relative to the system clock.
        _CSTD xtime _Tgt;
        const bool _Clamped     = _To_xtime_10_day_clamped(_Tgt, _Rel_time);
        const cv_status _Result = wait_until(_Lck, &_Tgt);
        if (_Clamped) {
            return cv_status::no_timeout;
        }

        return _Result;
    }

    //带超时等待并检测条件
    template <class _Rep, class _Period, class _Predicate>
    bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
        // wait for signal with timeout and check predicate
        return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred);
    }

    //等待,直到被唤醒或者到达某个时间点
    template <class _Clock, class _Duration>
    cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {
        // 等待到指定的时间点
        for (;;) {
            const auto _Now = _Clock::now();
            if (_Abs_time <= _Now) {
                return cv_status::timeout;
            }

            _CSTD xtime _Tgt;
            (void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
            const cv_status _Result = wait_until(_Lck, &_Tgt);
            if (_Result == cv_status::no_timeout) {
                return cv_status::no_timeout;
            }
        }
    }

    //阻塞等待,直到信号或指定时长到达,同时检测条件
    template <class _Clock, class _Duration, class _Predicate>
    bool wait_until(
        unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
        //等待带有超时的信号并检测条件
        return _Wait_until1(_Lck, _Abs_time, _Pred);
    }

    cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) {
        // wait for signal with timeout
        if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) {
            _Throw_Cpp_error(_OPERATION_NOT_PERMITTED);
        }

        // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow
        const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time);
        switch (_Res) {
        case _Thrd_success:
            return cv_status::no_timeout;
        case _Thrd_timedout:
            return cv_status::timeout;
        default:
            _Throw_C_error(_Res);
        }
    }

    template <class _Predicate>
    bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) {
        // wait for signal with timeout and check predicate
        return _Wait_until1(_Lck, _Abs_time, _Pred);
    }

    //返回原生句柄
    _NODISCARD native_handle_type native_handle() { // return condition variable handle
        return _Mycnd();
    }

    void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit
        _Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready);
    }

    void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit
        _Cnd_unregister_at_thread_exit(_Mtx._Mymtx());
    }

private:
    aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage;

    _Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage
        return reinterpret_cast<_Cnd_t>(&_Cnd_storage);
    }

    template <class _Predicate>
    bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) {
        // wait for signal with timeout and check predicate
        while (!_Pred()) {
            if (wait_until(_Lck, _Abs_time) == cv_status::timeout) {
                return _Pred();
            }
        }

        return true;
    }

    template <class _Clock, class _Duration, class _Predicate>
    bool _Wait_until1(
        unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) {
        while (!_Pred()) {
            const auto _Now = _Clock::now();
            if (_Abs_time <= _Now) {
                return false;
            }

            _CSTD xtime _Tgt;
            const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
            if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) {
                return _Pred();
            }
        }

        return true;
    }
};
【std::condition_variable源码摘要】

相关文章: