【问题标题】:Synchronization using Pthreads mutex and conditional variables in C在 C 中使用 Pthreads 互斥锁和条件变量进行同步
【发布时间】:2015-06-03 07:50:25
【问题描述】:

我正在尝试创建两个类似于 TaskA 和 TaskB 的线程。 TaskA 和 TaskB 都做了某种计算,这对这篇文章来说不是很有趣。 TaskA 和 TaskB 必须执行 10 次才能覆盖整个数组。 TaskA 有一个输入 AA 和一个输出 BB。 BB 也是TaskB 的输入。 CC是TaskB的输出。因为 BB 由 taskA 写入并由 taskB 读取,所以我们需要互斥锁。

我想要实现的行为是当TaskA对i进行操作时,TaskB对i-1进行并行操作,其中i是处理的数组数量。 我想避免 TaskB 等待 TaskA 为每个 i 完成。

这里的问题是我有一个死锁。 ThreadA 和 ThreadB 代表 TaskA 和 TaskB。为了更容易,我删除了所有计算,只留下了同步指令。造成死锁的原因是 ThreadA 在 threadB 处于等待 CV[0] 的状态之前向条件变量 CV[0] 发出信号。

你知道有什么方法可以消除死锁,但没有 TaskA 等待 TaskB 完成,反之亦然。理想情况下,当 TaskA 对数组 i 进行操作时,TaskB 应该对数组 i-1 进行操作。

/* Includes */
#include <unistd.h>     /* Symbolic Constants */
#include <sys/types.h>  /* Primitive System Data Types */ 
#include <errno.h>      /* Errors */
#include <stdio.h>      /* Input/Output */
#include <stdlib.h>     /* General Utilities */
#include <pthread.h>    /* POSIX Threads */
#include <string.h>     /* String handling */
#include <semaphore.h>  /* Semaphore */
#include <stdint.h>

#define ARRAY_SIZE 2048*2400
#define DEBUG
//#define CHECK_RESULTS

pthread_mutex_t mutex[10];
pthread_cond_t cv[10];

/* prototype for thread routine */
void threadA ( void *ptr );
void threadB ( void *ptr );


struct thread_arg
{
    uint32_t *in;
    uint32_t *out;
    uint32_t ID;    
};

int main()
{

    pthread_t pthA;
    pthread_t pthB;
   //Memory allocation 
    uint32_t *AA = malloc(10*ARRAY_SIZE*sizeof(uint32_t)); 
    uint32_t *BB = malloc(10*ARRAY_SIZE*sizeof(uint32_t));
    uint32_t *CC = malloc(10*ARRAY_SIZE*sizeof(uint32_t)); 
    unsigned int j,i;

    // THread Arguments
    struct thread_arg arguments[2];
    arguments[0].in  = AA;
    arguments[0].out = BB;
    arguments[0].ID  = 1;
    arguments[1].in  = BB;
    arguments[1].out = CC;
    arguments[1].ID  = 2;
    //Init arguments data
    for (j=0;j<10;j++)
    {
        for (i=0;i<ARRAY_SIZE;i++)
        {
            AA[j*ARRAY_SIZE+i] = i;
            BB[j*ARRAY_SIZE+i] = 0;
            CC[j*ARRAY_SIZE+i] = 99 ; 
        }
    }   

    //Semaphore and conditional variables init
    for (i=0;i<10;i++){
        pthread_mutex_init(&mutex[i], NULL);
        pthread_cond_init (&cv[i], NULL);

    }

    pthread_create (&pthA, NULL, (void *) &threadA, (void *) &arguments[0]);
    pthread_create (&pthB, NULL, (void *) &threadB, (void *) &arguments[1]);

    pthread_join(pthA, NULL);
    pthread_join(pthB, NULL);

    // Destroy Semaphores and CVs
    for (i=0;i<10;i++)
    {
        pthread_mutex_destroy(&mutex[i]);
        pthread_cond_destroy(&cv[i]); 
    }       
    // Checking results 

    exit(0);
} /* main() */





void threadA ( void *ptr )
{
    int i; 
    struct thread_arg *arg = (struct thread_arg *) ptr;
    for (i=0;i<10;i++)
    {
        pthread_mutex_lock(&mutex[i]);
        printf("TA: LOCK_M%d  \n",i);
        pthread_cond_signal(&cv[i]);
        printf("TA: SIG_CV%d\n",i);
        pthread_mutex_unlock(&mutex[i]);
        printf("TA: UNL_M%d\n",i);
    }   
    pthread_exit(0); /* exit thread */
}




void threadB ( void *ptr )
{
    int i; 
    struct thread_arg *arg = (struct thread_arg *) ptr;

    for (i=0;i<10;i++)
    {
        pthread_mutex_lock(&mutex[i]);
        printf("TB: WAIT_CV%d\n",i,i);
        pthread_cond_wait(&cv[i], &mutex[i]);
        printf("TB CV%d_PASSED\n",i);
        pthread_mutex_unlock(&mutex[i]);
        printf("TB UNL_M%d \n",i);
    }

  pthread_exit(NULL);
}

【问题讨论】:

  • 由于 A 比 B“跑在前面”,您可以先锁定所有互斥锁,然后在 A 准备好后依次解锁每个互斥锁。那么,如果 B 可以得到锁 i,那么 A 必须完成该部分。
  • 据我所知,主要问题是您不了解 (a) 互斥体和 (b) 条件变量的目的。 cvar 只是一种信号机制。互斥锁不保护 cvar;它保护 谓词,该 cvar 被用作状态更改的信号。您似乎试图将 cvar 本身用作谓词。虽然只是有点相关,但看看如何使用 cvar/mtx 对in this example

标签: c pthreads mutex deadlock


【解决方案1】:

正如 WhozCraig 评论的那样,条件变量需要与某个共享状态的条件配对,称为 谓词。互斥体用于保护共享状态。

在此示例中,您的共享状态可能是一个整数,其中包含 ThreadA 生成的最高索引 BB[]。 ThreadB 然后等待这个数字到达它要读取的索引。在这种设计中,您只需要一个互斥锁和一个条件变量。全局变量将是:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
int BB_ready = -1;  /* Protected by 'mutex' */

(使用静态的PTHREAD_*_INITIALIZER 初始化器意味着您无需为pthread_*_init()pthread_*_destroy() 操心)。

ThreadA 中的循环将是:

for (i=0;i<10;i++)
{
    /* Process AA[i] into BB[i] here */

    /* Now mark BB[i] as ready */
    pthread_mutex_lock(&mutex);
    printf("TA: LOCK_M%d  \n",i);
    BB_ready = i;
    pthread_cond_signal(&cv);
    printf("TA: SIG_CV%d\n",i);
    pthread_mutex_unlock(&mutex);
    printf("TA: UNL_M%d\n",i);
}

..在 ThreadB 中:

for (i=0;i<10;i++)
{
    /* Wait for BB[i] to be ready */
    pthread_mutex_lock(&mutex);
    printf("TB: WAIT_CV%d\n",i);
    while (BB_ready < i)
        pthread_cond_wait(&cv, &mutex);
    printf("TB CV%d_PASSED\n",i);
    pthread_mutex_unlock(&mutex);
    printf("TB UNL_M%d \n",i);

    /* Now process BB[i] into CC[i] here */
}

请注意,只要共享状态发生更改,就会调用pthread_cond_signal(),这允许另一个线程唤醒并重新检查状态(如果它正在等待)。

等待线程总是循环,检查状态,如果状态还没有准备好,则等待条件变量。

【讨论】:

    猜你喜欢
    • 2011-03-05
    • 1970-01-01
    • 2020-12-07
    • 2018-03-05
    • 1970-01-01
    • 1970-01-01
    • 2012-06-30
    • 1970-01-01
    • 2015-11-12
    相关资源
    最近更新 更多