【发布时间】:2014-03-01 23:39:10
【问题描述】:
我想确定我了解条件变量的工作原理,因此我将使用我编写的程序来提问。
在我的程序中,我有一个“生产者”线程(一个)和“工作线程”(几个让我们假设3)。
生产者线程“处理”一个 FIFO 链表,意思是,它所做的只是检查开头是否有一个项目(在我的程序 request 中调用,类型为 Req)列表(在我的程序中由名为 front 的全局指针指向),如果是,则将其分配给 global request 元素(称为globalReq)。
工作线程循环运行,等待请求被处理,通过将全局请求变量提取到自己的局部变量中(即它们中的每一个都是“私有”的,因为每个线程都有一个独立的堆栈 - 如果我错了,请纠正我),然后处理请求。
为了做到这一点,我使用互斥锁和条件变量。
一个重要的注意事项是,一旦存在请求(暂时让我们假设只存在一个请求),哪个工作线程将“照顾”它并不重要(假设它们都是“免费的”) " - 在条件变量上休眠)。
在提取请求并将其分配到全局请求后,生产者线程调用pthread_cond_signal - 据我所知,它至少会解除阻塞一个“阻塞”线程 --> 因此它可以解除阻塞,例如2个线程。
所以我的问题是我拥有的当前代码(如下):
1) 我如何确保只有一个线程(来自工作线程)会处理请求。 我是否需要像所有通用的“生产者消费者”实现一样添加“while 检查循环”?
2)通过pthread_cond_broadcast(或者如果pthread_cond_signal解锁了多个线程),互斥体上的内容,我可能还没有掌握它。
(每一个)工作线程的代码是:
void *worker(void *arg)
{
while(1)
{
printf("\n BEFORE LOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
pthread_mutex_lock(&sthread_mutex);
printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));
pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable
printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));
printf("\n got signal for thread: %d \n",syscall(SYS_gettid));
// extract the current request into g local variable
// within the "private stack" of this thread
Req localReq = globalReq;
pthread_mutex_unlock(&sthread_mutex);
printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
// perform the desired task
task(localReq);
printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid));
sem_post(&sem);
} // end while (1)
} // end of worker thread function
生产者线程的代码是:
void *producer(void *arg)
{
while(1)
{
if(front != NULL) // queue not empty
{
// go to sleep if all "worker threads" are occuipied
// or decrement number of free "worker threads" threads by 1
printf(" schedualer thread BEFORE calling sem_wait on sem \n");
sem_wait(&sem);
// lock the sthread mutex in order to "synchronize" with the
// "worker threads"...
printf(" schedualer thread BEFORE locking sthread_mutex \n");
pthread_mutex_lock(&sthread_mutex);
printf(" schedualer thread AFTER locking sthread_mutex \n");
globalReq = extract_element(); // this is the global request variable
// notify the worker threads that an "arriving request" needed to
// be taking care of
printf(" schedualer thread BEFORE calling signal on cond_var \n");
// pthread_cond_signal(&cond_var);
pthread_cond_broadcast(&cond_var);
printf(" schedualer thread AFTER calling signal on cond_var \n");
// unlock the smutex
printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n");
pthread_mutex_unlock(&sthread_mutex);
printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n");
} // queue not empty
else
continue;
} // end while (1)
} // end of producer
另一个问题:
生产者线程在全局信号量(在开始时以工作线程数初始化,在本例中为3)上调用sem_wait,以便为自己指明当前有多少工作线程正在处理一个请求,并且,为了完成这个“机制”,工作线程一旦处理完他们“赢得”的请求(当竞争条件变量时),调用sem_post表示“另一个工作线程可用”
3) 这是实现这种“指示有多少可用工作线程”的适当(良好/有效)方式吗?
4) 通过在 //* 段落中提到的生产者和工作线程之间共享的全局变量“传递”请求的好处和坏处是什么?传递它是一种明智的方式,还是最好“只”创建一个“新请求变量”(在堆上使用 malloc),它将为每个工作线程和请求“专用”(并在其中释放它)每个工作线程完成服务请求后)?
5) 随时向我指出您可能想到的有关这段代码的任何其他 cmets(好或坏)。
编辑:
大家好,
一些额外的问题:
除了生产者和工作线程之外,还有另一个线程,叫做listener,它的唯一任务就是将到达的请求插入到链表(FIFO队列)中,所以它真的不是之前提到的制作人的任务。
所以我的新问题是:
8) 有了关于我的程序的额外信息,我用信号量组成的“信号机制”是否有效?
9) 生产者和监听线程管理的链表有两个全局指针front和rear分别指向链表的头和尾(链表头是第一个要处理的请求)。
下面是监听线程执行的插入功能的实现,以及生产者线程执行的“提取”功能。
为了在“队列”(链表)上同步这两个线程,我使用了一个在它们之间共享的互斥锁,称为 qmutex。
我的问题是,关于下面的 2 个代码,在每个函数中“放置”互斥锁(锁定和解锁)的“最佳”位置在哪里?
感谢分配,
伙计。
插入函数:
void insertion(void *toInsert)
{
struct getInfo *req = (struct getInfo *)toInsert;
newNode = (N*)malloc(sizeof(N));
newNode->req = req;
newNode->next = NULL;
// WHERE SHULD I LOCK (AND UNLOCK) THE QUEUE MUTEX ????????????????????????
if(front == NULL)
{
front = newNode;
printf("empty list - insert as head \n");
}
else
{
rear->next = newNode;
printf(" NOT AN EMPTY list - insert as last node \n");
}
rear = newNode;
} // end of insertion
提取函数:
Req extract_element()
{
if(front == NULL)
printf("\n empty queue \n");
else
{
Req ret;
tmpExtract = front;
ret.socketNum = tmpExtract->req->socketNum;
ret.type = tmpExtract->req->type;
printf("\n extracting node with sockNum: %d \n",ret.socketNum);
front = front->next;
free(tmpExtract);
return(ret);
}
} // end of extract_element
【问题讨论】:
标签: c multithreading pthreads mutex condition-variable