【问题标题】:producer/consumer using boost threads and circular buffer hangs生产者/消费者使用提升线程和循环缓冲区挂起
【发布时间】:2012-04-17 11:31:42
【问题描述】:

我想通了。我犯了一个愚蠢的错误,我实际上并没有从队列中删除元素,我只是在读取第一个元素。我修改了代码,下面的代码不起作用。谢谢大家的帮助。

我正在尝试使用 boost 来实现生产者消费者问题,这实际上是一个更大项目的一部分。我已经从互联网上的示例中实现了一个程序,甚至还有一些我在这里找到的帮助。但是目前我的代码只是挂起。基于一些好的建议,我决定使用 boost ciruclar 缓冲区在生产者和消费者之间保存我的数据。那里有很多类似的代码,我能够从中汇集想法并自己编写一些东西。但是,我似乎仍然遇到与以前相同的问题(这是我的程序只是挂起)。我以为我没有像以前那样犯同样的错误..

下面给出了我的代码,我已经取出了我之前的代码,其中我只是我自己的单链接列表。

缓冲区头:

#ifndef PCDBUFFER_H
#define PCDBUFFER_H

#include <pcl/io/pcd_io.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>

class pcdBuffer
{
    public:
        pcdBuffer(int buffSize);
        void put(int data);
        int get();
        bool isFull();
        bool isEmpty();
        int getSize();
        int getCapacity();
    private:
        boost::mutex bmutex;
        boost::condition_variable buffEmpty;
        boost::condition_variable buffFull;
        boost::circular_buffer<int> buffer;
};


#endif

缓冲源(仅相关部分):

#include "pcdBuffer.h"
#include <iostream>

//boost::mutex io_mutex;

pcdBuffer::pcdBuffer(int buffSize)
{
    buffer.set_capacity(buffSize);
}

void pcdBuffer::put(int data)
{
    {
        boost::mutex::scoped_lock buffLock(bmutex);
        while(buffer.full())
        {
            std::cout << "Buffer is full" << std::endl;
            buffFull.wait(buffLock);
        }
        buffer.push_back(data);
    }
    buffEmpty.notify_one();
}

int pcdBuffer::get()
{
    int data;
    {
        boost::mutex::scoped_lock buffLock(bmutex);
        while(buffer.empty())
        {
            std::cout << "Buffer is empty" << std::endl;
            buffEmpty.wait(buffLock);
        }
        data = buffer.front();
            buffer.pop_front();
    }
    buffFull.notify_one();
    return data;
}

代码的主要驱动:

#include <iostream>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <unistd.h>
#include "pcdBuffer.h"

pcdBuffer buff(100);

void producer()
{
    int i = 10;
    while (true)
    {
        buff.put(i);
        i++;
    }
}

void consumer()
{
    int i;
    while(true)
    {
        i = buff.get();
        std::cout << "Data: " << i << std::endl;
    }
}

int main(int argc, char** argv)
{
    std::cout << "Starting main...." << std::endl;
    std::cout << "Buffer Details: " << std::endl;
    std::cout << "Capacity: " << buff.getCapacity() << ", isEmpty: " << buff.isEmpty() << ", isFull: " << buff.isFull() << std::endl;
    boost::thread cons(consumer);
    sleep(5);
    boost::thread prod(producer);
    prod.join();
    cons.join();
    return 0;
}

我的缓冲区容量已正确初始化为 100。消费者线程等待并报告“缓冲区为空”5 秒,但之后我只是从 put 方法和“数据”中获得“缓冲区已满”: 10" 从消费者功能在标准输出上交替。如您所见,10 是我放入的第一个元素。似乎缓冲区已满,并没有通知消费者,但我检查了我的锁,并认为它们是正确的。非常感谢您对此提供任何帮助。

这是我编写此代码的参考链接:

http://www.boost.org/doc/libs/1_49_0/libs/circular_buffer/doc/circular_buffer.html#classboost_1_1circular__buffer_19ba12c0142a21a7d960877c22fa3ea00

http://www.drdobbs.com/article/print?articleId=184401518&siteSectionName=cpp

Thread safe implementation of circular buffer

【问题讨论】:

  • 那篇文章真的很有趣,但是,我想避免 stl 队列的另一个原因是速度。最终,我将在线程之间共享的数据结构是一个点云,它表示来自类似 kinect 的相机的帧捕获。我想以 30fps 的速度捕获帧并将其保存在磁盘上。因此,我想尽可能地减少我的开销。这是我避免使用 STL 队列的另一个原因。正如文章建议的那样,我在通知条件变量之前尝试解锁我的锁,但它仍然挂起。我在这里更新了我的代码。
  • 先让它工作,然后让它快速。避免假设您的代码/设计的哪些部分会很慢。避免过早优化。将工作重点放在重要部分,同时尽可能使用标准组件。如果结果太慢,测量哪个部分慢,然后优化该部分
  • 这是一些很好的建议。我从 stefaanv 链接的文章中复制并实现了代码,它比我丑陋的代码更容易理解和理解。我会用点云数据来试试这个,希望它能足够快地完成我需要做的事情。

标签: c++ boost producer-consumer


【解决方案1】:

首先,与其编写自己的列表,不如将std::list 包裹在pcdQueue 中,而不是编写自己的列表。正确的是,std::list 不是线程安全的原样,但无论如何你在你的类中提供了必要的同步原语。

你的程序挂起的原因: 您保留锁定并填充队列直到它已满。您通过notify_one 通知消费者是没有用的,因为您的消费者将再次锁定,因为互斥体已经被占用(由生产者中的锁定)。

当您通过等待condition_variable 最终释放锁(当队列已满时)时,您不会唤醒您的消费者,因此您的消费者和生产者都被阻塞并且您的程序挂起。

改成:

void pcdQueue::produce()
{
    int i=0;
    while(true)
    {
        {
            boost::mutex::scoped_lock lock(qmutex);
            while( ! qlen < buffSize ) {
                std::cout << "Queue is full" << std::endl;
                full.wait(lock);
            }

            enqueue(i); // or myList.push_back(i) if you switch to std::list
        }

        empty.notify_one();


    }
}

您的consume()方法也有同样的问题。将其更改为:

pcdFrame* pcdQueue::consume()
{
    pcdFrame *frame;

    {
        boost::mutex::scoped_lock lock(qmutex);
        while( qlen == 0 ) {
            std::cout << "Queue is empty" << std::endl;
            empty.wait(lock);
        }

        frame = dequeue();
    }
    full.notify_one();

    return frame;
}

一般来说,请注意通知只有在有人在等待时才有效。否则,他们就会“迷失”。此外,请注意,在调用notify_one 时不需要保持互斥锁锁定(事实上,这可能会导致额外的上下文切换开销,因为您唤醒了另一个线程,该线程随后将等待当前的互斥锁(仍然)锁定(由你)。所以首先,释放互斥锁,然后告诉另一个线程继续。

请注意,两个线程都是无限运行的,所以你的主程序仍然会在第一个join() 处挂起并且永远不会退出。您可以在while-loops 中包含一个停止标志,以告诉您的线程完成。

【讨论】:

    【解决方案2】:

    Boost 现在在它的 lockfree 部分提供了一种生产者/消费者队列类型,它基本上是无锁的,尽管如果队列填满它可能会锁定。

    您可以在此处找到文档:

    http://www.boost.org/doc/libs/1_54_0/doc/html/lockfree.html

    这是一个相当新的添加,所以我认为它还不是许多标准包的一部分。另一方面,这看起来主要是标头,所有这些都取决于相当低级的系统东西,这些东西已经被提升了很长时间。

    【讨论】:

      【解决方案3】:

      我遇到了同样的悬挂问题。多亏了 Johannes 的回答,我才能让它完全正常工作。但是,我不得不使用 full.wait_for(lock, boost::chrono::milliseconds(100)); 在消费者和生产者使用非常短的循环缓冲区(3-4 个元素)时防止挂起。

      另外,我用 while (buffer->empty() 代替了 while (true)。

      最后,一切都可靠且快速地运行。

      【讨论】:

        猜你喜欢
        • 2017-04-07
        • 2019-06-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-11-15
        • 1970-01-01
        • 1970-01-01
        • 2011-03-21
        相关资源
        最近更新 更多