【问题标题】:Thread locks occuring using boost::thread. What's wrong with my condition variables?使用 boost::thread 发生的线程锁。我的条件变量有什么问题?
【发布时间】:2013-01-13 04:56:16
【问题描述】:

我编写了一个 Link 类,用于在网络中的两个节点之间传递数据。我用两个双端队列实现了它(一个用于从节点 0 到节点 1 的数据,另一个用于从节点 1 到节点 0 的数据)。我正在尝试对应用程序进行多线程处理,但我遇到了线程锁。我试图防止同时读取和写入同一个双端队列。在阅读有关我最初如何实现此功能的更多信息时,我认为我错误地使用了条件变量(也许不应该使用布尔变量?)。我应该有两个互斥锁,每个双端队列一个吗?如果可以的话请帮忙。谢谢!

class Link {
public:
// other stuff...
void push_back(int sourceNodeID, Data newData);
void get(int destinationNodeID, std::vector<Data> &linkData);

private:
// other stuff...
std::vector<int> nodeIDs_;
// qVector_ has two deques, one for Data from node 0 to node 1 and 
// one for Data from node 1 to node 0
std::vector<std::deque<Data> > qVector_; 
void initialize(int nodeID0, int nodeID1);

boost::mutex mutex_;
std::vector<boost::shared_ptr<boost::condition_variable> > readingCV_;
std::vector<boost::shared_ptr<boost::condition_variable> > writingCV_;
std::vector<bool> writingData_;
std::vector<bool> readingData_;
};

push_back 函数:

void Link::push_back(int sourceNodeID, Data newData)
{
int idx;
if (sourceNodeID == nodeIDs_[0]) idx = 1;
else 
{
    if (sourceNodeID == nodeIDs_[1]) idx = 0;
    else throw runtime_error("Link::push_back: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (readingData_[idx]) readingCV_[idx]->wait(lock);

writingData_[idx] = true;
qVector_[idx].push_back(newData);
writingData_[idx] = false;
writingCV_[idx]->notify_all();
}

get函数:

void Link::get(int destinationNodeID,
std::vector<Data> &linkData)
{
int idx;
if (destinationNodeID == nodeIDs_[0]) idx = 0;
else 
{
    if (destinationNodeID == nodeIDs_[1]) idx = 1;
    else throw runtime_error("Link::get: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (writingData_[idx]) writingCV_[idx]->wait(lock);
readingData_[idx] = true;

std::copy(qVector_[idx].begin(),qVector_[idx].end(),back_inserter(linkData));
qVector_[idx].erase(qVector_[idx].begin(),qVector_[idx].end());
readingData_[idx] = false;
readingCV_[idx]->notify_all();
return;
}

这里是初始化(如果有帮助的话)

void Link::initialize(int nodeID0, int nodeID1)
{
readingData_ = std::vector<bool>(2,false);
writingData_ = std::vector<bool>(2,false);
for (int i = 0; i < 2; ++i)
{
    readingCV_.push_back(make_shared<boost::condition_variable>());
    writingCV_.push_back(make_shared<boost::condition_variable>());
}
nodeIDs_.reserve(2);
nodeIDs_.push_back(nodeID0);
nodeIDs_.push_back(nodeID1);
qVector_.reserve(2);
qVector_.push_back(std::deque<Data>());
qVector_.push_back(std::deque<Data>());
}

【问题讨论】:

    标签: c++ multithreading boost condition-variable


    【解决方案1】:

    我正在尝试对应用程序进行多线程处理,但我遇到了线程锁。

    什么是“线程锁”?很难看出你的代码试图完成什么。首先考虑您的 push_back() 代码,其同步部分如下所示:

    boost::unique_lock<boost::mutex> lock(mutex_);
    
    while (readingData_[idx]) readingCV_[idx]->wait(lock);
    
    writingData_[idx] = true;
    qVector_[idx].push_back(newData);
    writingData_[idx] = false;
    writingCV_[idx]->notify_all();
    

    您的writingData[idx] 布尔值一开始是假的,只有在线程锁定互斥体时才暂时变为真。当互斥锁被释放时,它又是假的。因此,对于必须等待获取互斥锁的任何 other 线程,writingData[idx]永远 为真。

    但是在你的 get() 代码中,你有

    boost::unique_lock<boost::mutex> lock(mutex_);
    // pause to avoid multithreading collisions
    while (writingData_[idx]) writingCV_[idx]->wait(lock);
    

    当线程获得互斥锁上的锁时,writingData[idx] 恢复为 false,因此 从不 进入 while 循环(并在 CV 上等待)。

    完全对称的分析适用于 readingData[idx] 布尔值,它在互斥锁之外也总是为 false。

    所以您的条件变量从不等待。您需要彻底重新考虑您的设计。

    从每个队列一个互斥锁开始(双端队列对于简单地传递数据来说是多余的),并且为每个队列关联一个条件变量与非空队列。因此,get() 方法将等到队列非空,这将在push_back() 方法中发出信号。像这样的东西(未经测试的代码):

    template <typename Data>
    class BasicQueue
    {
    public:
        void push( Data const& data )
        {
            boost::unique_lock  _lock( mutex_ );
            queue_.push_back( data );
            not_empty_.notify_all();
        }
    
        void get ( Data& data )
        {
            boost::unique_lock  _lock( mutex_ );
            while ( queue_.size() == 0 )
                not_empty_.wait( _lock ); // this releases the mutex
            // mutex is reacquired here, with queue_.size() > 0
            data = queue_.front();
            queue_.pop_front();         
        }
    
    private:
        std::queue<Data>            queue_;
        boost::mutex                mutex_;
        boost::condition_variable   not_empty_;
    };
    

    【讨论】:

    • 感谢您的回复。我的意思是“僵局”——这就是我在午夜写一个问题的结果。另外,我当然需要清理这个实现,但我认为这是与条件变量绑定的布尔变量应该工作的方式。我想允许在另一个线程没有写入同一个双端队列的任何时候读取get()。所以,我只在线程实际写入数据时将writingData 设置为true。布尔标志是否需要由锁定的互斥锁保护,以便读取和写入不会同时发生?
    • 您将其设置回false 在同一个锁的范围内!到 另一个 线程检查布尔值时,这个新线程只能在 获取互斥锁后(即 第一个线程已释放它,布尔值又是假的。
    • 您可能对简历的用途感到困惑。当线程在 CV 上等待时,它(原子地)释放互斥锁,以便其他线程可以获取互斥锁,对共享变量进行操作,并(可能)发出第一个线程正在等待的条件的信号。 CV 不是为了“通知”其他线程关于该线程可能在做什么或可能不做什么。请查看我添加到答案中的代码。请注意读取线程必须等待某些数据出现在队列中,这是只有写入线程才能执行的操作(然后发出信号)。
    • 您提供的代码中没有死锁。可能有很多旋转(例如,您的 get() 函数不能保证双端队列中有任何内容。)
    • 显然我对简历的用途感到困惑。我在想,只要有锁,就需要一份简历。谢谢你帮我解决这个问题。回到绘图板...
    【解决方案2】:

    是的。你需要两个互斥锁。您的死锁几乎可以肯定是单个互斥体争用的结果。如果您使用调试器闯入正在运行的程序,您将看到线程挂起的位置。我也不明白你为什么需要布尔值。 (编辑:可能会提出一种使用单个互斥锁的设计,但每个共享数据结构使用一个互斥锁更简单、更安全)

    经验法则是,每个要保护的共享数据结构都有一个互斥锁。该互斥锁保护数据结构免受并发访问并提供线程安全。在您的情况下,每个双端队列一个互斥锁。例如:

    class SafeQueue
    {
    private:
      std::deque<Data> q_;
      boost::mutex m_;
      boost::condition_variable v_;
    
    public:
      void push_back(Data newData)
      {
        boost::lock_guard<boost::mutex> lock(m_);
        q_.push_back(newData);
        // notify etc.
      }
    // ...
    };
    

    关于通过条件变量的通知,请参见此处:

    Using condition variable in a producer-consumer situation

    因此,每个对象也会有一个condition_variable,生产者将通知该对象,而消费者将等待。现在您可以创建两个这样的队列来进行双向通信。请记住,如果两个线程都被阻塞(等待数据)并且两个队列都是空的,那么只有两个线程仍然可能死锁。

    【讨论】:

    • 也感谢您的帮助。我当然需要简化很多(并阅读更多内容!)。
    猜你喜欢
    • 2014-02-25
    • 1970-01-01
    • 2011-02-07
    • 1970-01-01
    • 1970-01-01
    • 2020-05-26
    • 1970-01-01
    • 1970-01-01
    • 2015-07-08
    相关资源
    最近更新 更多