【问题标题】:Producer-Consumer: Lost Wake-up issue生产者-消费者:失去唤醒问题
【发布时间】:2013-12-02 09:00:31
【问题描述】:

我试图为生产者-消费者问题编写代码。下面的代码大部分时间都可以正常工作,但有时会因为“Lost Wake-up”(我猜)而卡住。我尝试了线程 sleep() 但它没有用。在我的代码中处理这种情况需要进行哪些修改?信号量在这里有用吗?如果是,我将如何在这里实现它们?

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

int product = 0;
boost::mutex mutex;
boost::condition_variable cv;
boost::condition_variable pv;
bool done = false;

void consumer(){
    while(done==false){
        //cout << "start c" << endl
        boost::mutex::scoped_lock lock(mutex);
        cv.wait(lock);
        //cout << "wakeup c" << endl;
        if (done==false)
        {
            cout << product << endl;
            //cout << "notify c" << endl;
            pv.notify_one();
        }
        //cout << "end c" << endl;
    }
}

void producer(){
    for(int i=0;i<10;i++){
        //cout << "start p" << endl;
        boost::mutex::scoped_lock lock(mutex);
        boost::this_thread::sleep(boost::posix_time::microseconds(50000));
        ++product;
        //cout << "notify p" << endl;
        cv.notify_one();
        pv.wait(lock);
        //cout << "wakeup p" << endl;
    }
    //cout << "end p" << endl;
    cv.notify_one();
    done = true;
}

int main()
{
    int t = 1000;
    while(t--){
        /*
        This is not perfect, and is prone to a subtle issue called the lost wakeup (for example, producer calls notify() 
        on the condition, but client hasn't really called wait() yet, then both will wait() indefinitely.) 
        */
        boost::thread consumerThread(&consumer);    
        boost::thread producerThread(&producer);

        producerThread.join();
        consumerThread.join();
        done =false;
        //cout << "process end" << endl;
    }
    cout << "done" << endl;
    getchar();
    return 0;
}

【问题讨论】:

标签: c++ multithreading boost synchronization boost-thread


【解决方案1】:

是的,您想要一种方法来(在消费者中)知道您“错过”了一个信号。信号量可以提供帮助。给猫剥皮的方法不止一种,下面是我的简单介绍(仅使用 c++11 标准库功能):

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

为方便起见,我添加了一个方便的wait() 重载,它接受一个在锁下执行的函数。这使得消费者无需手动操作锁就可以操作“信号量”(并且在没有数据竞争的情况下仍然获得product 的值):

semaphore sem;

void consumer() {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

一个完整的演示:Live on Coliru

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <cassert>

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

semaphore sem;

int product = 0;
std::mutex processed_mutex;
std::condition_variable processed_signal;

bool done = false;

void consumer(int check) {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;
        assert(++check == received_product);

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

void producer() {
    std::unique_lock<std::mutex> lock(processed_mutex);
    for(int i = 0; i < 10; ++i) {
        ++product;
        sem.notify();
        processed_signal.wait(lock);
    }
    done = true;
    sem.notify();
}

int main() {
    int t = 1000;
    while(t--) {
        std::thread consumerThread(&consumer, product);
        std::thread producerThread(&producer);
        producerThread.join();
        consumerThread.join();
        done = false;
        std::cout << "process end" << std::endl;
    }
    std::cout << "done" << std::endl;
}

【讨论】:

  • processed_signal 通知仍然会丢失
  • @stefaanv 如果您演示了如何操作,我将不胜感激。我认为processed_mutex 充分保护了这一点。消费者在获得互斥锁之前不可能向processed_signal 发出信号(有点用词不当,顺便说一句:)):因此,根据定义,生产者首先等待条件变量。
  • 你是对的,虽然这主要是因为锁在整个生产者线程周围,这在大多数情况下超出了拥有线程的目的,但在这里没关系。我没发现。
  • @stefaanv 实际上,那个锁逻辑上不是这里的锁。这是一个同步原语。将其视为“负临界区”;它在等待条件变量时被“打开”。当然可以调整,但这是由 OP 决定
【解决方案2】:

您似乎忽略了变量done 也是共享状态,与product 的扩展相同。这可能导致几个比赛条件。在您的情况下,我至少看到consumerThread 没有任何进展的情况:

  1. 循环执行的目的
  2. 消费者执行,并在cv.wait(lock); 等待
  3. 生产者完成for循环,通知消费者被抢占
  4. 消费者醒来,阅读"done==false",输出产品,再次阅读done == false,等待条件
  5. 生产者设置完成为真并退出
  6. 消费者永远卡住了

为避免此类问题,您应该在读取或写入完成时持有锁。顺便说一句,您的实现是相当有序的,即生产者和消费者当时只能处理一条数据......

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-27
  • 2011-08-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多