【问题标题】:OpenCV multithreading (Windows/.NET) delays several seconds from video captureOpenCV 多线程 (Windows/.NET) 从视频捕获延迟几秒钟
【发布时间】:2012-02-25 20:58:20
【问题描述】:

我有一个多线程的 openCV 程序,它使用 4 个线程来执行以下操作:

线程1->调用cvQueryFrame(),它从相机中一张一张地抓取帧图像并将它们存储到std::vectorinputBuffer

线程 2->对 inputBuffer[0] 执行阈值处理,将结果复制到另一个名为 filterOutputBufferstd::vector

线程 3->执行光流算法/为 filterOutputBuffer 中的前两个元素绘制流场,将结果复制到另一个名为 std::vectorofOutputBuffer

线程4->使用cvShowImage(ofOutputBuffer[0])显示图像

所以基本上我设想每个线程在相应输入向量/缓冲区的第一个元素上执行任务,并将结果存储在相应输出向量的后面。有点像 3 个工厂工人在流水线上尽自己的一份力,然后将最终结果扔进一个桶里给下一个人。

我为所有缓冲区设置了互斥体,程序正常工作,只有输出从实时摄像机流延迟了几秒钟。

我运行了同一程序的非多线程版本(使用了一个巨大的 while(true) 循环),它实时运行,只是偶尔卡顿。

为什么我的并发实现在性能上有这么大的延迟?

以下是线程函数:

    void writeBuffer()
    {
        cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl;
        CvCapture *capture = 0;
        IplImage *frame = 0;
        DWORD waitResult;

        if (!(capture = cvCaptureFromCAM(0)))
            cout << "Cannot initialize camera!" << endl;

        //now start grabbing frames and storing into the vector inputBuffer
        while (true)
        {
            //cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl;
            waitResult = WaitForSingleObject(hMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0:
                    frame = cvQueryFrame(capture); //store the image into frame
                    if(!frame)
                    {
                        cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl;
                    }
                    //cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl;
                    inputBuffer.push_back(*frame);
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl;
            }
            if(!ReleaseMutex(hMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl;
            }
            //else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl;
            //signal hDoneGettingFrame
            PulseEvent(hDoneGettingFrame);
        }
            cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }


    void opticalFlow()
    {
    ...
        DWORD waitResult;

        //start grabbing frames from the vector inputBuffer
        cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
        while(true)
        {
            waitResult = WaitForSingleObject(fMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0: 
                    //grab first two frames from buffer (inputBuffer[0-1]) and process them
                    if(filterOutputBuffer.size() > 1)
                    {   
                        frame1 = filterOutputBuffer[0];
                        frame2 = filterOutputBuffer[1];
                        filterOutputBuffer.erase(filterOutputBuffer.begin());
                    }
                    else 
                    {
                        if(!ReleaseMutex(fMutex)) 
                            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;
                        //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                        continue;
                    }
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                    continue;
            }
            if(!ReleaseMutex(fMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            }
    ...
    //Do optical flow stuff
    ...
    waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult)
            {
                // The thread got ownership of the mutex
                case WAIT_OBJECT_0:
                    //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                    ofOutputBuffer.push_back(*frame1_3C);
                break;
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
    }
        cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }

    void filterImage()
{
    DWORD waitResult;
...

    //start grabbing frames from the vector inputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
    while(true)
    {
        waitResult = WaitForSingleObject(hMutex, INFINITE);
        switch(waitResult) 
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0: 
                //grab first frame and then release mutex
                if(inputBuffer.size() > 0)
                {   
                    frame = inputBuffer[0];
                    inputBuffer.erase(inputBuffer.begin());
                }
                else 
                {
                    if(!ReleaseMutex(hMutex)) 
                        cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
                    //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                    continue;
                }
            break; 
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                continue;
        }
        if(!ReleaseMutex(hMutex)) 
        { 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
        }
...
//Tresholding Image Stuff
...
        //cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl;
        waitResult = WaitForSingleObject(fMutex, INFINITE);
        switch(waitResult)
        {
            // The thread got ownership of the mutex
            case WAIT_OBJECT_0:
                //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                filterOutputBuffer.push_back(*out);
            break;
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl;
        }
        if(!ReleaseMutex(fMutex)) 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;

    }
}

void displayImage()
{
    DWORD waitResult;
    IplImage final;
    int c;
    cvNamedWindow("Image", CV_WINDOW_AUTOSIZE);
    //start grabbing frames from the vector ouputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl;
    while (true)
    {
            waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult) 
            {
                    // The thread got ownership of the mutex
                    case WAIT_OBJECT_0:
                        if(ofOutputBuffer.size() > 0)
                        {
                            //cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl;
                            final = ofOutputBuffer[0];
                            ofOutputBuffer.erase(ofOutputBuffer.begin());
                        }
                        else 
                        {
                            if(!ReleaseMutex(oMutex)) 
                                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
                            //else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl;
                            continue;
                        }
                    break;
                    default:
                        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
                        continue;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            //else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl;

            //cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl;
            cvShowImage("Image", &final);
            c = cvWaitKey(1);
    }
    cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}

这里是主要功能:

void main()
{
    hMutex = CreateMutex(NULL, FALSE, NULL);
    oMutex = CreateMutex(NULL, FALSE, NULL);
    fMutex = CreateMutex(NULL, FALSE, NULL);

    hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);
    hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);

    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
    TName[1]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)filterImage, NULL, 0, &ThreadID);
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID);
    WaitForMultipleObjects(4, TName, TRUE, INFINITE);
    CloseHandle(TName);
}

【问题讨论】:

    标签: .net windows multithreading opencv


    【解决方案1】:

    多线程应用程序在线程之间共享 CPU 时间。因此,当另一个线程想要处于运行状态时,存在上下文切换。可能,线程之间的切换会增加 CPU 时间,从而导致应用程序变慢。

    【讨论】:

      【解决方案2】:

      尝试使用threadPool,它将最大限度地减少CPU在线程之间传输所消耗的时间。

      【讨论】:

      • 也许如果我将阈值和光流移动到同一个线程中,可以最大限度地减少上下文切换?
      【解决方案3】:

      好吧,首先,如果我将您的第一个线程函数的循环滚动一下:

      if(!ReleaseMutex(hMutex)){} 
      PulseEvent(hDoneGettingFrame);
      waitResult = WaitForSingleObject(hMutex, INFINITE);
      

      换句话说,您的第一个线程几乎在第一个线程循环的整个运行过程中都持有队列互斥锁,从而防止第二个线程到达任何地方。我猜所有其他线程的代码都是一样的?

      当在管道中向生产者-消费者队列推送数据时,您应该只将队列锁定最少的时间。对缓冲区对象进行处理,然后锁定队列,推送对象引用,然后立即解锁队列。然后发出信号量(或类似的信号),以便下一个线程可以处理该对象。

      不要让队列保持锁定状态!互斥锁不应该在那里等待其他线程等待工作 - 这是为了保护队列免受多次访问。您需要一些其他信号来维持队列计数并让线程等待工作。无论您在网上看到多少其他示例,都不要为此使用事件 - 如果您必须滚动自己的生产者-消费者队列,请使用信号量。

      更好 - 使用已经工作的 P-C 队列类 - 查看 BlockingCollections 类。

      【讨论】:

        【解决方案4】:

        信号量成功了!我没有使用单独的互斥锁,而是创建了一个信号量并让所有线程通过它工作。

        谢谢,现在运行起来又快又流畅!

        void main()
        {
            hSemaphore = CreateSemaphore( 
                NULL,           // default security attributes
                MAX_THREADS,  // available count (when a thread enters, it decreases)
                MAX_THREADS,  // maximum count
                NULL);          // unnamed semaphore
        
            if (hSemaphore == NULL) 
            {
                printf("CreateSemaphore error: %d\n", GetLastError());
                return;
            }
        
        
            TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
            TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
            TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID);
            WaitForMultipleObjects(4, TName, TRUE, INFINITE);
            CloseHandle(TName);
        }   
        

        在线程中......

           //instead of separate Mutexes, just wait for semaphore
            waitResult = WaitForSingleObject(hSemaphore, INFINITE);
                        switch(waitResult) 
                        {
        
                                    ...
        
                                    }
                        if(!ReleaseSemaphore(hSemaphore, 1, NULL)) 
                            cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
        

        【讨论】:

        • 不确定你做了什么,在这里。每个线程间队列都需要自己的信号量来计算队列项,并需要自己的互斥体来保护队列免受多次访问。如果有 5 个线程,它们之间有 4 个阶段,则需要 4 个队列、4 个信号量和 4 个互斥锁。
        • 我创建了一个信号量,所有线程都进入完成工作,然后离开。如果我必须使用互斥锁,信号量的目的到底是什么?
        猜你喜欢
        • 1970-01-01
        • 2021-06-23
        • 2015-03-27
        • 2022-01-18
        • 2018-04-08
        • 2019-10-25
        • 1970-01-01
        • 1970-01-01
        • 2014-06-03
        相关资源
        最近更新 更多