【问题标题】:Proper Condition variables usage适当的条件变量使用
【发布时间】:2014-03-01 23:39:10
【问题描述】:

我想确定我了解条件变量的工作原理,因此我将使用我编写的程序来提问。

在我的程序中,我有一个“生产者”线程(一个)和“工作线程”(几个让我们假设3)。

生产者线程“处理”一个 FIFO 链表,意思是,它所做的只是检查开头是否有一个项目(在我的程序 request 中调用,类型为 Req)列表(在我的程序中由名为 front 的全局指针指向),如果是,则将其分配给 global request 元素(称为globalReq)。

工作线程循环运行,等待请求被处理,通过将全局请求变量提取到自己的局部变量中(即它们中的每一个都是“私有”的,因为每个线程都有一个独立的堆栈 - 如果我错了,请纠正我),然后处理请求。

为了做到这一点,我使用互斥锁和条件变量。

一个重要的注意事项是,一旦存在请求(暂时让我们假设只存在一个请求),哪个工作线程将“照顾”它并不重要(假设它们都是“免费的”) " - 在条件变量上休眠)。

在提取请求并将其分配到全局请求后,生产者线程调用pthread_cond_signal - 据我所知,它至少会解除阻塞一个“阻塞”线程 --> 因此它可以解除阻塞,例如2个线程。

所以我的问题是我拥有的当前代码(如下):

1) 我如何确保只有一个线程(来自工作线程)会处理请求。 我是否需要像所有通用的“生产者消费者”实现一样添加“while 检查循环”?

2)通过pthread_cond_broadcast(或者如果pthread_cond_signal解锁了多个线程),互斥体上的内容,我可能还没有掌握它。

(每一个)工作线程的代码是:

void *worker(void *arg)
{

       while(1)
       {
         printf("\n BEFORE LOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));
                 pthread_mutex_lock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));

         printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));           
                 pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable
         printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));            

                 printf("\n got signal for thread: %d \n",syscall(SYS_gettid));

         // extract the current request into g local variable 
             // within the "private stack" of this thread   
                 Req localReq = globalReq;  
                 pthread_mutex_unlock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));

            // perform the desired task
                   task(localReq);
           printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid));
           sem_post(&sem);

       } // end while (1)

} // end of worker thread function

生产者线程的代码是:

void *producer(void *arg)
{
       while(1)
       {

              if(front != NULL)  // queue not empty  
              {
              // go to sleep if all "worker threads" are occuipied 
              // or decrement number of free "worker threads" threads by 1
              printf(" schedualer thread BEFORE calling sem_wait on sem \n");
                          sem_wait(&sem);

              // lock the sthread mutex in order to "synchronize" with the
              // "worker threads"...
              printf(" schedualer thread BEFORE locking sthread_mutex \n");
                          pthread_mutex_lock(&sthread_mutex);
              printf(" schedualer thread AFTER locking sthread_mutex \n");


                          globalReq = extract_element(); // this is the global request variable 

              // notify the worker threads that an "arriving request" needed to 
              // be taking care of
              printf(" schedualer thread BEFORE calling signal on cond_var \n");
                          // pthread_cond_signal(&cond_var);
              pthread_cond_broadcast(&cond_var);
              printf(" schedualer thread AFTER calling signal on cond_var \n");

              // unlock the smutex
              printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n");
                          pthread_mutex_unlock(&sthread_mutex);
              printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n");

               }  // queue not empty  

               else
                    continue;


        }  // end while (1)

 } // end of producer

另一个问题:

生产者线程在全局信号量(在开始时以工作线程数初始化,在本例中为3)上调用sem_wait,以便为自己指明当前有多少工作线程正在处理一个请求,并且,为了完成这个“机制”,工作线程一旦处理完他们“赢得”的请求(当竞争条件变量时),调用sem_post表示“另一个工作线程可用”

3) 这是实现这种“指示有多少可用工作线程”的适当(良好/有效)方式吗?

4) 通过在 //* 段落中提到的生产者和工作线程之间共享的全局变量“传递”请求的好处和坏处是什么?传递它是一种明智的方式,还是最好“只”创建一个“新请求变量”(在堆上使用 malloc),它将为每个工作线程和请求“专用”(并在其中释放它)每个工作线程完成服务请求后)?

5) 随时向我指出您可能想到的有关这段代码的任何其他 cmets(好或坏)。

编辑:

大家好,

一些额外的问题:

除了生产者和工作线程之外,还有另一个线程,叫做listener,它的唯一任务就是将到达的请求插入到链表(FIFO队列)中,所以它真的不是之前提到的制作人的任务。

所以我的新问题是:

8) 有了关于我的程序的额外信息,我用信号量组成的“信号机制”是否有效?

9) 生产者和监听线程管理的链表有两个全局指针frontrear分别指向链表的头和尾(链表头是第一个要处理的请求)。

下面是监听线程执行的插入功能的实现,以及生产者线程执行的“提取”功能。

为了在“队列”(链表)上同步这两个线程,我使用了一个在它们之间共享的互斥锁,称为 qmutex。

我的问题是,关于下面的 2 个代码,在每个函数中“放置”互斥锁(锁定和解锁)的“最佳”位置在哪里?

感谢分配,

伙计。

插入函数:

void insertion(void *toInsert)
{

    struct getInfo *req = (struct getInfo *)toInsert;
        newNode = (N*)malloc(sizeof(N));
        newNode->req = req;
        newNode->next = NULL;

    // WHERE SHULD I LOCK (AND UNLOCK) THE  QUEUE MUTEX ????????????????????????
        if(front == NULL)
    {
        front = newNode;
        printf("empty list - insert as head \n");     
    }

        else
    {
        rear->next = newNode;
        printf(" NOT AN EMPTY list - insert as last node \n");
    }

        rear = newNode;
}  // end of insertion

提取函数:

Req extract_element()
{

        if(front == NULL)
            printf("\n empty queue \n");
        else
        {
                Req ret;
                tmpExtract = front;
            ret.socketNum = tmpExtract->req->socketNum;
            ret.type = tmpExtract->req->type;
                printf("\n extracting node with sockNum: %d \n",ret.socketNum);
                front = front->next;
                free(tmpExtract);
                return(ret);
        }
} // end of extract_element

【问题讨论】:

    标签: c multithreading pthreads mutex condition-variable


    【解决方案1】:

    与其直接回答您的问题,不如先介绍一下典型的解决方法:

    您有一个队列或列表,您可以在其中添加工作数据。每当您添加一组工作数据时,您首先锁定一个互斥锁,添加数据,发出条件变量信号,然后解锁互斥锁。

    您的工作线程然后锁定互斥锁,并在队列为空时循环等待条件。当信号发出时,一个或多个工作人员会被唤醒,但只有一个(一次)会抢到互斥锁。锁定互斥锁后,“获胜者”检查队列中是否有东西,提取它,解锁互斥锁,并执行必要的工作。解锁互斥锁后,其他线程也可能会唤醒(如果条件被广播,也会唤醒),并且将从队列中提取下一个工作,或者如果队列为空,则返回等待。

    在代码中,它看起来有点像这样:

    #include <pthread.h>
    #include <unistd.h>
    #include <stdio.h>
    
    #define WORKER_COUNT 3
    
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    pthread_t workers[WORKER_COUNT];
    
    static int queueSize = 0;
    
    static void *workerFunc(void *arg)
    {
        printf("Starting worker %d\n", (int)arg);
        while(1) {
            pthread_mutex_lock(&mutex);
            while(queueSize < 1) {
                pthread_cond_wait(&cond, &mutex);
            }
            printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize);
            //Extract work from queue
            --queueSize;
            pthread_mutex_unlock(&mutex);
            //Do work
            sleep(1);
        }
    }
    
    int main()
    {
        int i;
    
        pthread_mutex_init(&mutex, 0);
        pthread_cond_init(&cond, 0);
    
        for(i=0; i<WORKER_COUNT; ++i) {
            pthread_create(&(workers[i]), 0, workerFunc, (void*)(i+1));
        }
    
        sleep(1);
        pthread_mutex_lock(&mutex);
        //Add work to queue
        queueSize = 5;
        pthread_cond_broadcast(&cond);
        pthread_mutex_unlock(&mutex);
    
        sleep(10);
    
        return 0;
    }
    

    (我没有在线程之后进行清理,将工人编号传递给线程既快速又脏,但在这种情况下有效)。

    在这里,worker 将被pthread_cond_broadcast() 唤醒,只要队列中有东西就会运行(直到queueSize 回到 0 - 假设还有一个实际队列),然后返回等待。

    回到问题:

    1:互斥锁和保护变量(这里是queueSize)负责这个。您还需要保护变量,因为您的线程可能也因其他原因而被唤醒(所谓的虚假唤醒,请参阅http://linux.die.net/man/3/pthread_cond_wait)。

    2:如果您调用pthread_mutex_lock(),唤醒的线程会像任何其他线程一样争夺互斥锁。

    3:我不确定您为什么需要将可用工作线程的数量发回给生产者?

    4:您的生产者和消费者都需要可以访问队列 - 但仍然可以通过各种方式使用函数(或类,如果您使用 C++)进行封装。

    5:我希望以上就足够了?

    6:pthread_cond_wait() 的问题在于它可能会产生虚假唤醒。也就是说,即使您没有发出条件信号,它也可能会唤醒。因此,您需要一个保护变量(在我的代码示例中,pthread_cond_wait() 周围的 while() 循环),以确保在 pthread_cond_wait() 返回时确实有唤醒的理由。然后,您使用与条件使用相同的互斥锁来保护保护变量(以及您需要提取的任何工作数据),然后您可以确定只有一个线程会完成每一项工作。

    7:我不会让生产者进入睡眠状态,而是让它将可以提取的任何数据添加到工作队列中。如果队列已满,那么它应该进入睡眠状态,否则它应该继续添加东西。

    8:对于您的 Listener 线程,我真的不明白为什么您甚至需要您的 Producer 线程。为什么不让工人自己打电话给extract_element()

    9:您需要保护对列表变量的所有访问。也就是说,在insertion() 中,在您第一次访问front 之前锁定互斥锁,并在您最后一次访问rear 之后解锁它。 extract_element() 中的相同 - 尽管您需要重写函数以在队列为空时也具有有效的返回值。

    【讨论】:

    • @sonicwave:首先感谢您的回答,确实很棒。如果您能回答我的其他问题,那就太好了!
    • 您应该只编辑您的原始问题并添加额外的问题,而不是将它们添加到答案中 - 我已经编辑了我的答案以包含那些(6 和 7)的答案。
    • @sonicwave:再次感谢您的回答,我听取了您关于编辑我的问题的建议。如果可以,请查看有关我的程序的附加信息。谢谢!!
    • @sonicwave:再次感谢分配,我喜欢它,因为我喜欢所有其他答案!我如何将其标记为已接受(我没有 enoghu 声誉来“喜欢”答案 - 我还能做其他事情吗?)
    • 点击上/下投票按钮正下方的复选标记轮廓 - stackoverflow.com/help/someone-answers
    【解决方案2】:

    想确定上一期和另一件事,所以新的问题是:

    1. 如果我仍然想“坚持”我的线程实现,即使用我写的互斥锁和条件变量,我怎么能确定当生产者线程调用 pthread_cond_signal 时,只有一个线程会继续运行(来自 pthread_cond_wait 之后的指令)?
      我是否需要添加另一个检查或使用另一个变量,或者(如我所愿)只使用 pthread_cond_wait/signal 的通用机制?

    注意:我使用 pthread_cond_broadcast 是为了“模拟” pthread_cond_signal 解除对多个线程的阻塞的情况。

    1. 澄清我的程序“逻辑”:

    生产者线程在分配请求时减少信号量的原因,相反,工作线程增加它的值,是为了让生产者线程在所有工作线程都忙的情况下在信号量上“进入睡眠状态” - -> 意思是“等待提取和分配过程”,直到(至少)一个工作线程可用于处理请求。 这是我想要实现的一个很好的实现,还是有更好的方法?

    【讨论】:

      猜你喜欢
      • 2018-12-12
      • 2020-01-09
      • 2019-05-31
      • 2023-02-03
      • 2012-10-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多