【问题标题】:Visual C++ and concurrency threads..why this happened?Visual C++ 和并发线程..为什么会这样?
【发布时间】:2013-06-18 08:21:49
【问题描述】:

我必须编写一个与并发线程一起工作的程序,用于处理 OpenCV Mat 图像。每个线程从队列中选择一个图像,对其进行处理并将结果放入另一个队列。我使用 Mat 图像的线程安全模板队列(如您在代码中所见)。

但是线程的奇怪行为是:如果我多次启动程序,每次我在单线程的详细说明数量中获得不同的结果(我插入的计数器“添加”到监视图像处理的数量单线程)。

第一个线程(零)总是做所有它的精化(在这个例子中,10),但其余的线程,不要。有时每个线程都进行 10 次细化,有时 3 次,有时 5...2.. 新更改(条件变量和关键部分)线程只执行 1 次操作。

我不知道问题出在哪里……为什么会这样。

我在这里传递了我的代码,请你检查一下,并告诉我你认为有什么问题......我很绝望。

这是代码:

#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>

using namespace std;
using namespace cv;

/*thread safe queue*/

template<typename T>
class coda_concorr
{
private:
    std::queue<T> la_coda;
    HANDLE mutex;

public: 
    bool elemento;
    coda_concorr()
    {
        mutex = CreateMutex(NULL,FALSE,NULL);
        }
    ~coda_concorr()
    {}
    void push(T& data)
    {
        WaitForSingleObject(mutex,INFINITE);
        la_coda.push(data);
        ReleaseMutex(mutex);
        }
    bool vuota() const
    {
        WaitForSingleObject(mutex,INFINITE);
        bool RetCode = la_coda.empty();
        ReleaseMutex(mutex);
        return RetCode;
    }
    bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        while (la_coda.empty()){
            ReleaseMutex(mutex);
            return false;
        }
        WaitForSingleObject(mutex,INFINITE);
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
};


struct Args
{
    coda_concorr<cv::Mat> in;
    coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};


CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION  Lock1;
CRITICAL_SECTION  Lock2;
CRITICAL_SECTION  Lock3;
CRITICAL_SECTION  Lock4;

bool stop;

//initial populating queue
void puts (void* param){
    Args* arg = (Args*)param;
    int i=0;
    Mat image;

    while(!arg->in.vuota()){
        arg->in.try_pop(image);
        arg->out->push(image);
        i++;        
        WakeConditionVariable(&NonVuoto1);
        }
    //fine  
    cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
    WakeConditionVariable(&NonVuoto1);
    _endthread();
}

//grey funct
void grey (void *param){
    Mat temp1,temp2;
    int add = 0;
    Args* arg = (Args*)param;
    while(true){
        EnterCriticalSection(&Lock1);
        //se vuoto
        while(arg->in.vuota() && !stop){
             SleepConditionVariableCS(&NonVuoto1,&Lock1,INFINITE);
            }
            if(stop==true){
            LeaveCriticalSection(&Lock1);
            break;
            }
        arg->in.try_pop(temp1);
        cvtColor(temp1,temp2,CV_BGR2GRAY);
        arg->out->push(temp2);
        add++;
        cout<<endl<<"grey ha fatto: "<<add<<endl;
        LeaveCriticalSection(&Lock1);
        WakeConditionVariable(&NonVuoto2);
        }
    //fine  
    cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
    _endthread();
}

//threshold funct
void soglia(void *param){
    Mat temp1a,temp2a;
    int add=0;
    Args* arg = (Args*)param;
    while(true){
        EnterCriticalSection(&Lock2);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto2,&Lock2,INFINITE);
            }
        if(stop==true){
            LeaveCriticalSection(&Lock2);
            break;
            }
        arg->in.try_pop(temp1a);
        threshold(temp1a,temp2a,128,255,THRESH_BINARY);
        arg->out->push(temp2a);
        add++;
        LeaveCriticalSection(&Lock2);
        WakeConditionVariable(&NonVuoto3);
        cout<<endl<<"soglia ha fatto: "<<add<<endl;
        }
        //fine 
     cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
     _endthread();
}

//erode/dilate funct
void finitura(void *param){
    Mat temp1b,temp2b,temp2c;
    int add = 0;
    Args* arg = (Args*)param;
    //come consumatore
    while(true){
        EnterCriticalSection(&Lock3);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto3,&Lock3,INFINITE);
            }
        if(stop==TRUE){
            LeaveCriticalSection(&Lock3);
            break;
            }   
        arg->in.try_pop(temp1b);
        erode(temp1b,temp2b,cv::Mat());
        dilate(temp2b,temp2c,Mat());
        arg->out->push(temp2c);
        add++;
        LeaveCriticalSection(&Lock3);
        WakeConditionVariable(&NonVuoto4);
        cout<<endl<<"erode ha fatto: "<<add<<endl;
        }
     //fine 
     cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
    _endthread();
}

//contour funct
void contorno (void *param){
    Mat temp;
    int add=0;
    Args* arg = (Args*)param;
    //come consumatore
    while(true){
        EnterCriticalSection(&Lock4);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto4,&Lock4,INFINITE);
            }
        if(stop==TRUE){
            LeaveCriticalSection(&Lock4);
            break;
            }   
    //esegue pop
    arg->in.try_pop(temp);
    //trova i contorni
    vector<vector<Point>> contorni;
    findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
    //disegna i contoni in un'immagine
    Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
    Scalar colors[3];
    colors[0] = Scalar(255,0,0);
    colors[1] = Scalar(0,255,0);
    colors[2] = Scalar(0,0,255);
    for (size_t idx = 0; idx < contorni.size(); idx++){
        drawContours(dst,contorni,idx,colors[idx %3]);
        }

    //come produttore
    arg->out->push(dst);
    add++;
    cout<<endl<<"cont ha fatto: "<<add<<endl;
    LeaveCriticalSection(&Lock4);
    }
    cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
   _endthread();
}

//main
int main()
{

    coda_concorr<cv::Mat> ingresso;
    coda_concorr<cv::Mat> uscita;

    InitializeConditionVariable(&NonVuoto1);
    InitializeConditionVariable(&NonVuoto2);
    InitializeConditionVariable(&NonVuoto3);
    InitializeConditionVariable(&NonVuoto4);
    InitializeCriticalSection(&Lock1);
    InitializeCriticalSection(&Lock2);
    InitializeCriticalSection(&Lock3);
    InitializeCriticalSection(&Lock4);


    LARGE_INTEGER count1, count2, freq;
    double elapsed;


    Mat temp[10];
    Mat out;

    //dichiarazione code
    Args dati0,dati1,dati2,dati3,dati4;


    //avvio contatori
    QueryPerformanceFrequency(&freq);   
    QueryPerformanceCounter (&count1);

    for(int i=0;i<10;i++){
        temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
        ingresso.push(temp[i]);
    }

    //next queue pointer
    dati0.in=ingresso;
    dati0.out=&dati1.in;
    dati1.out=&dati2.in;
    dati2.out=&dati3.in;
    dati3.out=&dati4.in;
    dati4.out=&uscita;



    //handle
    HANDLE handle0,handle1,handle2,handle3,handle4;

    //start threads
    handle0 = (HANDLE) _beginthread(puts,0,&dati0);
    handle1 = (HANDLE) _beginthread(grey,0,&dati1);
    handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
    handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
    handle4 = (HANDLE) _beginthread(contorno,0,&dati4);

    cout<<endl<<"..Join dei threads..."<<endl;

    //join
    WaitForSingleObject(handle0,INFINITE);
    WaitForSingleObject(handle1,INFINITE);
    WaitForSingleObject(handle2,INFINITE);
    WaitForSingleObject(handle3,INFINITE);
    WaitForSingleObject(handle4,INFINITE);



    //chiusura contatori
    QueryPerformanceCounter (&count2);

    CloseHandle(handle0);
    CloseHandle(handle1);
    CloseHandle(handle2);
    CloseHandle(handle3);
    CloseHandle(handle4);

    elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;


    cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;
        system("PAUSE");
    return 0;
}

如果前一个线程将所有图像放入队列,为什么下一个线程不这样做?

我在 Windows 7 64bit 上使用 Visual C++ 2010 OpenCV 2.4.4

请帮我找出问题所在...

【问题讨论】:

  • 除此之外,您真的需要所有那些浪费 CPU、导致延迟的睡眠轮询循环吗?改为使用信号量来计算队列。
  • 如何使用信号量?有哪些改进?感谢您的宝贵时间。
  • 向队列类添加一个信号量(MSDN 'CreateSemaphore'),初始化计数为 0。推送时,锁定互斥体,推送,解锁互斥体,然后发出信号量,(MSDN '释放信号量')。弹出时,先等待信号量,WaitForSingleObject(queueSema, INFINITE),然后锁定互斥量,弹出数据,解锁互斥量。这种方案消除了队列计数轮询和延迟 - 每当信号量 WFSO 返回时,队列上肯定会有可用的对象。
  • “当 WFSO 返回时”是什么意思?对不起我的坚持,但直到最近我才不得不处理线程......你可以发布你所说的代码更改吗?提前感谢您的宝贵时间。
  • 'WFSO' - WaitForSingleObject'。 MS API 名称输入过多:)

标签: windows multithreading visual-c++ opencv


【解决方案1】:

您想要实现的似乎类似于工厂中的流水线工作,每个人都完成自己的工作,然后将其扔给下一个工人,直到完成所有工作。如果我错了,请纠正我。每个线程函数的成语是

void dowork(){
    while(noinput()){
        sleep(0.01);
    }
    while(getSomeInput()){
        processInput();
        queueResult();
    }
    displayAmountOfWorkDone();
}

您成功地使用互斥体提供互斥。您的设计的问题是,一旦一个线程观察到他的输入队列不为空,他就会消耗所有工作然后退出。由于线程调度和处理时间processInput(),工人可以以比他之前的工人更高的速率消耗他的输入。例如在 init 和 datit 之间排队,这是可能的:

datit: see 0, sleep
init : see 10 - add 1 
datit: see 1, process
datit: see 0, exit and outputs 1
init : add 2
init : add 3
init : add 4
....
init : add 10

您需要更改设计。 应该有不同的机制来表示工作已经结束。现在,您正在使用线程可以观察到的输入量。一个快速而肮脏的解决方法是给每个线程他期望处理的输入量,并将算法重写为

void dowork(){
    while(workIsnotDone()){//local test, no lock
        if(getSomeInput()){
             processInput();
             queueResult();
             updateWorkProgress();//local operation
        }
        else{
             sleep(0.01);
        }
    }
    displayAmountOfWorkDone();
}

更好的选择是将coda_concorr 类设置为生产者-消费者机制。为此,您可以添加一个条件变量。每个线程是一个队列上的消费者和另一个队列上的生产者。您还可以添加一个字段来明确指定不再输入的内容。看看this other question on SO

【讨论】:

  • 是的,你明白我的问题...这正是发生的事情...我该如何解决?如何解决这个问题?提前感谢您的宝贵时间。
  • 正是我需要第二个选项......每个线程都是一个队列的生产者和另一个队列的消费者......但是这样做是在你链接的问题中,我需要提升库为了力量?我在 Visual C++ 2010...
  • 你并没有改变putsgreyfiniturasogliacontourno 等背后的想法......他们仍然会消耗一些消息然后退出。您需要他们留下来寻找消息,直到触发外部条件。再看看我提出的函数dowork。
  • 我已经考虑过条件变量...我已经在 OP 中更新了我的代码,但是现在所有线程都在执行 1 次操作(出于明确原因,线程 0 除外)...为什么不工作?你能看看新代码吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-02-28
  • 1970-01-01
  • 1970-01-01
  • 2013-10-07
  • 1970-01-01
相关资源
最近更新 更多