【问题标题】:Two questions on std::condition_variables关于 std::condition_variable 的两个问题
【发布时间】:2020-08-31 12:30:12
【问题描述】:

我一直在试图弄清楚std::condition_variables,我对wait() 以及是使用notify_all 还是notify_one 感到特别困惑。

首先,我编写了一些代码并将其附在下面。这里有一个简短的解释:Collection 是一个持有一堆Counter 对象的类。这些Counter 对象有一个Counter::increment() 方法,需要在所有对象上一遍又一遍地调用它。为了加快速度,Collection 还维护了一个线程池来分配工作,并使用其Collection::increment_all() 方法发送所有工作。

这些线程不需要相互通信,而且Counter 对象的数量通常比线程的数量多得多。如果一个线程处理的比其他线程多于Counters,这很好,只要所有工作都完成即可。向队列添加工作很容易,只需要在“主”线程中完成。据我所知,唯一可能发生的坏事是,如果允许在正在完成的工作中调用计数器上的其他方法(例如Collection::printCounts)。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>


class Counter{
private:
    int m_count;
public:
    Counter() : m_count(0) {}
    void increment() { 
        m_count ++; 
    }
    int getCount() const { return m_count; }
};


class Collection{
public:
    Collection(unsigned num_threads, unsigned num_counters) 
    : m_shutdown(false)
    {
        // start workers
        for(size_t i = 0; i < num_threads; ++i){
            m_threads.push_back(std::thread(&Collection::work, this)); 
        }

        // intsntiate counters
        for(size_t j = 0; j < num_counters; ++j){
            m_counters.emplace_back();
        }
    }

    ~Collection() 
    { 
        m_shutdown = true;
        for(auto& t : m_threads){
            if(t.joinable()){
                t.join();
            }
        }
    }

    void printCounts() {

        // wait for work to be done
        std::unique_lock<std::mutex> lk(m_mtx);
        m_work_complete.wait(lk); // q2: do I need a while lop?  

        // print all current counters
        for(const auto& cntr : m_counters){
            std::cout << cntr.getCount() << ", ";
        }
        std::cout << "\n";
    }

    void increment_all() 
    {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_work_complete.wait(lock);
        for(size_t i = 0; i < m_counters.size(); ++i){
            m_which_counters_have_work.push(i);
        }

    }


private:    
    void work()
    {
        while(!m_shutdown){

            bool action = false;
            unsigned which_counter;
            {
                std::unique_lock<std::mutex> lock(m_mtx);
                if(m_which_counters_have_work.size()){
                    which_counter = m_which_counters_have_work.front();
                    m_which_counters_have_work.pop();
                    action = true;
                }else{
                    m_work_complete.notify_one(); // q1: notify_all
                }
            }

            if(action){
                m_counters[which_counter].increment();
            }
        }   
    }



    std::vector<Counter> m_counters;
    std::vector<std::thread> m_threads;
    std::condition_variable m_work_complete;
    std::mutex m_mtx;
    std::queue<unsigned> m_which_counters_have_work;
    bool m_shutdown;

};

int main() {

    int num_threads = std::thread::hardware_concurrency()-1;
    int num_counters = 10;
    Collection myCollection(num_threads, num_counters);

    myCollection.printCounts();
    myCollection.increment_all();
    myCollection.printCounts();

    myCollection.increment_all();
    myCollection.printCounts();

    return 0;
}

我在 Ubuntu 18.04 上使用 g++ -std=c++17 -pthread thread_pool.cpp -o tp &amp;&amp; ./tp 编译此代码,我认为代码实现了所有这些目标,但仍然存在一些问题:

  1. 我正在使用m_work_complete.wait(lk) 确保在开始打印所有新计数之前完成工作。 为什么我有时会看到这个写在 while 循环中,或者带有第二个参数作为 lambda 谓词函数? These docs 提到虚假唤醒。如果发生虚假唤醒,这是否意味着printCounts 可能会过早打印?如果是这样,我不想那样。我只想确保工作队列是空的,然后再开始使用应该存在的数字。

  2. 我使用的是m_work_complete.notify_all 而不是m_work_complete.notify_one。我读过this thread,我认为这并不重要——只有主线程会被这个阻塞。 使用notify_one 是否更快,这样其他线程就不必担心了?

【问题讨论】:

    标签: c++ multithreading mutex race-condition stdthread


    【解决方案1】:
    1. std::condition_variable 并不是真正的条件变量,它更像是达到某个条件的同步工具。该条件是什么取决于程序员,并且在每次condition_variable 唤醒后仍应检查它,因为当尚未达到所需条件时,它可能会虚假或“过早”唤醒。

      在 POSIX 系统上,condition_variable::wait() 委托给 pthread_cond_wait,这很容易受到虚假唤醒的影响(请参阅基本原理部分中的“条件等待语义”)。在 Linux 上,pthread_cond_wait 又通过 futex 实现,这又容易受到虚假唤醒的影响。

      所以是的,您仍然需要一个标志(受同一个互斥锁保护)或其他方式来检查工作是否实际完成。一种方便的方法是将检查包装在谓词中并将其传递给wait() 函数,该函数将循环处理直到满足谓词为止。

    2. notify_all 解除阻塞等待条件变量的所有线程; notify_one 只解锁一个(或至少一个,准确地说)。如果有多个等待线程,并且它们是等价的,即任何一个都可以完全处理条件,并且如果条件足以让一个线程继续下去(如提交一个工作单元到线程池),那么@ 987654332@ 会更有效,因为它不会不必要地解除阻塞其他线程,因为它们只会注意到没有工作要做并返回等待。如果你只有一个服务员,那么notify_onenotify_all 之间没有区别。

    【讨论】:

      【解决方案2】:

      很简单:使用notify() when;

      1. 没有理由让多个线程需要知道该事件。 (例如,使用notify() 宣布某个工作线程将“使用”的项目的可用性,从而使该项目对其他工作人员不可用)
        *AND*
      2. 没有错误线程可以被唤醒。 (例如,如果所有线程都 wait()ing 在同一个函数的同一行中,那么您可能是安全的。)

      在所有其他情况下使用notify_all()

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多