【问题标题】:C++ Semaphore Confusion?C ++信号量混乱?
【发布时间】:2016-07-07 23:16:14
【问题描述】:

所以,我正在编写一种类似于示波器的程序,它读取计算机上的串行端口并对这些数据执行 fft 以将其转换为频谱。我遇到了一个问题,虽然我的程序布局分为SerialHandler 类(使用boost::Asio)、FFTHandler 类和main 函数。 SerialHandler 类使用boost::Asio`` async_read_some 函数从端口读取并引发名为HandleOnPortReceive 的事件,然后该事件本身读取数据。

问题是我无法找到一种方法将由另一个线程上的io_service 对象引发的事件处理程序中的数据传递给另一个线程上的FFTHandler 类。有人建议我使用信号量来解决我的问题,但是我对 semaphore.h 的用法几乎一无所知,所以我的实现现在已经很糟糕了,并没有做任何应该做的事情。

下面是一些代码,如果这样可以让它更清晰一点:

using namespace Foo;
//main function      
int main(void){
     SerialHandler serialHandler;
     FFTHandler fftHandler;

     sem_t *qSem_ptr = &qSem;

     sem_init(qSem_ptr, 1, 0);

     //create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following
     serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration

     t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem));

    //allow the user to stop the program and avoid the problem of an infinite loop blocking the program
    char inChar = getchar();
    if (inChar) {...some logic to stop reading}
}




namespace Foo{

    boost::thread *t1;
    boost::thread *t2;
    sem_t qSem;
    std::queue<double> q;
    boost::mutex mutex_;

    class SerialHandler{
    private:
        char *rawBuffer; //array to hold incoming data
        boost::asio::io_service ioService;
        boost::asio::serial_port_ptr serialPort;
    public:

            void SerialHandler::StartConnection(int _baudRate, string _comPort){
                //some functionality to open the port that is irrelevant to the question goes here

                AsyncReadSome();  //starts the read loop

                //create thread for io_service object and let function go out of scope
                t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService)); 

            }

            void SerialHandler::AsyncReadSome(){

                //there's some other stuff here for error_catching, but this is the only important part
                serialPort->async_read_some (
                            boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE),
                            boost::bind(
                                    &SerialHandler::HandlePortOnReceive,
                                    this, boost::asio::placeholders::error,
                                    boost::asio::placeholders::bytes_transferred, q));
           }

           void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){
                boost::mutex::scoped_lock lock(mutex_);
                 //more error checking goes here, but I've made sure they aren't returning and are not the issue

                 for (unsigned int i =0; i<bytes_transferred; i++){
                    unsigned char c = rawBuffer[i];
                    double d = (double) c;  //loop through buffer and read
                    if (c==endOfLineChar){
                    } else  //if not delimiting char, push into queue and post semaphore
                    {
                            q.push(d);
                            //cout << d  << endl;
                            sem_post(&qSem);
                            cout << q.front() << endl;
                            cout << "size is: " << q.size() << endl;
                    }
                }
                //loop back on itself and start the next read
                AsyncReadSome();

            }
    }

    class FFTHandler{
    private:
        double *in; //array to hold inputs
        fftw_complex *out; //holds outputs
        int currentIndex;
        bool filled;
        const int N;
    public:


        void AppendIn(std::queue<double> &q, sem_t &qSem){
                while(1){  //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively...
                    cout << "test" << endl;
                    sem_wait(&_qSem); //wait for data...this is blocking but I don't know why
                    double d = _q.front();
                    _q.pop();
                    in[currentIndex]=d; //read queue, pop, then append in array
                    currentIndex++;
                    if (currentIndex == N){ //run FFT if full and reset index
                            currentIndex = N-overlap-1;
                            filled = true;
                            RunFFT();
                    }
                }

         }
    }

}

FFTHandler::AppendIn(..) 中的调试行确实正在触发,因此正在创建线程,但它似乎立即超出范围并破坏了线程,因为似乎我已经设置了一段时间以错误地响应信号。

TLDR:这是一个很长的解释,简单地说,“我不了解信号量,但需要以某种方式实现它们。我尝试过,失败了,所以现在我来这里希望能收到请比我知识渊博的人帮忙处理这段代码。

更新:所以在玩弄了一些调试语句之后,似乎问题在于while(1){...} 语句确实在触发,但是sem_wait(&amp;_qSem); 导致它阻塞。无论出于何种原因,它都会无限期地等待,尽管信号量正在发布,但它会继续等待并且永远不会超出该线。

【问题讨论】:

  • 我看不到 信号量使用 有什么特别的问题,但我可以看到线程的潜在问题 - 例如,main 是否在线程完成之前退出? (从你刚才说的代码中不清楚“......停止阅读的一些逻辑”)
  • 为什么 SerialHandler 和 FFTHandler 在不同的线程上运行?如果目的是将工作传递给另一个线程以便 SerialHandler 可以接收下一组数据,那么您可能需要考虑领导者/跟随者模式并拥有一个运行 io_service 的线程池。在这里,每个线程都将使用 SerialHandler 读取数据并使用 FFTHandler 处理数据,一旦完成,将返回池中等待下一个 io 事件。
  • @aichao 我让 SerialHandler io_service 在一个单独的线程上运行,因为它在处理程序中循环回自身,因此它创建了一个无限循环。但是,我希望能够停止程序,并且不可停止的无限循环并不是一个理想的功能,所以我将它移到另一个线程,以便能够接受用户输入,最终在用户喜欢它时停止这个循环。出于类似的原因,FFTHandler::AppendIn(...) 函数位于其自己的线程上,即不阻塞主线程并释放用户以在程序运行时执行操作(即停止程序)。
  • 不要将boost::bindboost::thread 一起使用,这是多余的。只需将参数直接传递给boost::thread 构造函数。
  • @imp903 好的,我想我知道你现在想要做什么。我的评论的重点是,您可以将 io_service 用作事件队列(而不仅仅是 I/O 事件队列)并让多个线程处理事件,包括您现在拥有的 I/O 事件以及“队列已满” FFT”事件,然后由 FFTHandler 处理。单独的 I/O 和 FFT 事件可以在单独的线程上同时发生,因为一个线程读取更多数据,而另一个线程正在处理前一组数据;都没有阻塞主线程,这是你的意图。如果有兴趣,我可以发布答案。

标签: c++ multithreading boost-asio semaphore


【解决方案1】:

由于您已经在使用boost::mutex 及其作用域锁类型,我建议您使用boost::condition_variable 而不是POSIX 信号量。否则,您会将 C++11 风格的同步与 POSIX 同步混合。

您在添加到队列时锁定了互斥锁,但我没有看到任何锁定互斥锁以从队列中读取的内容。看起来您在互斥锁仍处于锁定状态时正在循环调用 AsyncReadSome

选择一种同步形式,然后正确使用它。

【讨论】:

  • 感谢您的建议,这似乎消除了一些困惑。我根据您的想法拉出信号量并使用 boost::condition_variable ,这似乎有效。虽然它引入了另一个问题(分段错误),但据我所知,这与信号和条件变量没有太大关系。
【解决方案2】:

信号量的初始值为 0,在这种情况下有效。所以它需要一个 sem_post 来解锁FFTHandler::AppendIn()。但是我没有看到第一次调用SerialHandler::AsyncReadSome() 以读取串行端口并推送到队列中的代码。如果你修复那部分代码,我认为 sem_post 会发生并且 FFTHandler 线程会运行。作为第一步,您可以在 sem_wait 和 AsyncReadSome() 函数之后进行调试打印,我的猜测是两者都不会被执行。

因此,本质上您会希望确保“读取”被启动并作为主线程或不同线程的一部分保持活动状态。

【讨论】:

  • 我很抱歉,这是我的错,我意识到我在复制代码时犯了一个错误。我在创建新线程 t1 之前调用了 AsyncReadSome()。我将更新上面的代码。但是,以防万一,我确实创建了一些调试行并且这些行正在触发,因此它正在执行 AsyncReadSome()。话虽如此,在尝试您的建议时,我发现了与信号量有关的其他有趣的东西,所以我也会将其添加到问题中。
  • 您在更新中提出的观点正是我的意思 :) 感谢您保持发布。
猜你喜欢
  • 2016-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-05-06
  • 1970-01-01
  • 2016-10-04
  • 2011-07-14
  • 1970-01-01
相关资源
最近更新 更多