【问题标题】:Condition variable - unexpected behaviour [closed]条件变量 - 意外行为
【发布时间】:2012-12-14 11:15:20
【问题描述】:

我正在尝试让我的程序做到这一点:

  • 接受输入:nrNodesNrWorkers
  • 3 个线程(workers)一次只能访问列表(读取),但是 只有 1 人可以写。
  • 当 5 个节点完成后(sqrt 值),它应该停止并让进来 清理链表中所有已完成节点的清理线程, 而不是它将访问权返回给剩余的工作线程

    线程1: 获取 semaphoreSlot(); 工作; 如果条件完成发送信号; 直到列表被解析;

    线程2: 等待信号; 干净的; 返回对thread1的访问;

    清洁后: 遇到的问题; 停止,不退出;

代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <math.h>
#include <semaphore.h>

#define doneLimit 5

struct dataBlock{
    struct node *root;
    int listSize;
    int forIndex;
};

struct node { // std linked list node
    int value;
    int worker;
    struct node *next;
};

int done = 0;

sem_t sem;

pthread_mutex_t mutp;   // mutex
pthread_cond_t  condvar;   //condvar

void *deleteDoneNodes(void *n){
    pthread_cond_wait( &condvar, &mutp );
    struct node *root = n;
    struct node *it = root;
    struct node *prev = NULL;

    printf("Cleaning 1's \n");   
        do{
            if(it->value == 1){
                struct node *next = it->next;
                if (prev != NULL) {
                    prev->next = next;
                }
                if (it == root) {
                    root = next;
                }
                free(it);
                it = next;
            }
            else {
                prev = it;
                it = it->next;
            }
        }while(it !=  NULL);
        done = 0;

    pthread_exit(NULL);
}

void * worker( void *data ){
    // get list
    int wFlag;
    struct dataBlock *inData = ( struct dataBlock * ) data;
    struct node *root = inData->root;
    int forIndex = inData->forIndex;
    free(data);

    // parse
    while(1){
        if( sem_wait( &sem )  != 0 ){
            printf( " > waiting...  \n" );
        }
        struct node *it = root;

        printf("    Thread >>>  %d  ---  %lu \n", forIndex, pthread_self() );
        do{
            wFlag = 0;
            pthread_mutex_lock( &mutp );
            if( forIndex == it->worker ){
                if( it->value > 2 ){

                    while( it->value != 1 ){
                        it->value = sqrt(it->value);
                        }
                        printf("! node done\n");
                    pthread_mutex_unlock( &mutp );
                    wFlag += 1;
                    done += 1;
                    if( done == doneLimit ){ // limit 5
                        pthread_cond_signal( &condvar );
                    }
                    break;
                }
            }
            else{
                printf("...parsed done node. \n");
            }
            it = it->next;
            pthread_mutex_unlock( &mutp );
        }while(it !=  NULL);

        sem_post(&sem); 
        sleep(1); // "create" concurrancy envi. 
        if ( wFlag == 0 ){
            break;
        }
    }
    pthread_exit(NULL);
}



int main( int argc, char *argv[] ){
    if ( argc != 3 ){
        printf( "Programm must be called with \n NR of elements and NR of workers! \n " );
        exit( 1 );
    }
    int i;
    struct node *root;
    struct node *iterator;  

//prepare list for task
    int listSize = atoi(argv[1]);
    int nrWorkers = atoi(argv[2]);
    root = malloc(sizeof( struct node) );

    root->value = rand() % 100;
    root->worker = 0;
    iterator = root;

    for( i=1; i<listSize; i++ ){
        iterator->next = malloc(sizeof(struct node));
        iterator = iterator->next;
        iterator->value = rand() % 100;
        iterator->worker = i % nrWorkers;
        printf("node #%d worker: %d  value: %d\n", i, iterator->worker,iterator->value);
    }
    iterator->next = NULL;
    printf("? List got populated\n");
// init semaphore > keeps max 3 threads working over the list

    if( sem_init(&sem,0,3) < 0){
      perror("semaphore initilization");
      exit(0);
    }

// Create all threads to parse the link list
    int ret;    
    pthread_mutex_init( &mutp,NULL );
    pthread_cond_init( &condvar, NULL );

    pthread_t w_thread;
    pthread_t* w_threads = malloc(nrWorkers * sizeof(w_thread));

    for( i=0; i < nrWorkers; i++ ){         
        struct dataBlock *data = malloc(sizeof(struct dataBlock));
        data->root = root;
        data->listSize = listSize;
        data->forIndex = i;
        ret = pthread_create ( &w_threads[i], NULL, worker, (void *) data );
        if( ret ) {
            perror("Worker creation fail \n");
            exit(2);    
        }   
    } 
// Create Cleaning thread
    pthread_t c_thread;
    ret = pthread_create(&c_thread, NULL, deleteDoneNodes, (void *) root);
    if ( ret ){
        printf("Cleaner cration fail \n");
    }

// Join threads
    for ( i = 0; i < nrWorkers; i++ ){
        pthread_join(w_threads[i],NULL);
    }

    iterator = root;
    for ( i = 0; i < listSize; i++){
        printf("val: %d  worker: %d _  \n", iterator->value, iterator->worker);
        iterator = iterator->next;
    }

    free( root );
    pthread_mutex_destroy( &mutp );
    pthread_cond_destroy( &condvar );
    sem_destroy( &sem );

    return 0;
}

附言: 有用 ./s 1 1 ./s 1 4 ./s 4 4 ./s 4 1

在清洁工调用后失败: 它在 ./s x n 处失败,当 x >= 5

【问题讨论】:

  • SO 不是代码审查网站。您有具体的技术问题吗?在这里发帖时,您应该更加小心,精简您的代码并写出语法正确。
  • @JensGustedt 我粘贴了所有内容,因为有人可能想要编译它,我想知道的是大约 3 行,是 cond var 做正确的事情还是我试图打印/valgrind 问题。
  • 我的问题不是你在这里粘贴代码。我的问题是您没有将代码简化为一个最小的示例,并且您无法制定一个精确的问题。很可能 99% 的代码与您的问题无关。

标签: c multithreading pthreads posix condition-variable


【解决方案1】:

至少你没有正确处理条件变量和互斥体的使用。

在调用 pthread_cond_wait() 之前,该互斥锁应被线程锁定。

man pthread_cond_wait逐字逐句:

pthread_cond_wait 原子地解锁互斥锁(根据 pthread_unlock_mutex)并等待条件变量 cond 变为 发出信号。线程执行被挂起并且不消耗任何 直到条件的 CPU 时间 变量发出信号。调用线程必须在进入 pthread_cond_wait 时锁定互斥锁。在返回之前 调用线程,pthread_cond_wait 重新获取互斥锁(根据 pthread_lock_mutex)。

从调用pthread_cond_wait()返回后,互斥锁也被锁定,并且必须在某个时候解锁。

【讨论】:

  • 谢谢。也许我累了,因为我弄错了操作和调用的顺序
猜你喜欢
  • 2017-11-03
  • 1970-01-01
  • 1970-01-01
  • 2016-06-26
  • 1970-01-01
  • 2021-01-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多