【发布时间】:2023-03-19 03:06:01
【问题描述】:
在下面的代码中,每个线程运行sell_tickets() 函数,获取互斥锁并减少票数。为了限制活动线程数,checkin() 和checkout() 线程是使用信号量的生产者-消费者模型。
但在单元测试中,似乎消费者函数checkout() 在生产者checkin() 之前调用pthread_join() 函数,甚至创建新线程。那么有什么问题吗?是不是因为线程不共享栈内存段,因为我没有malloc()堆空间给一些参数。
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
unsigned *tickets_pool; // shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned ticketsnum, pthread_mutex_t *lock, unsigned *tickets_pool) {
a->agent_id = agentid;
a->tickets_tosell = ticketsnum;
a->pool_lock = lock;
a->tickets_pool = tickets_pool;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock);
(*a->tickets_pool)--;
printf("agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
pthread_mutex_unlock(a->pool_lock);
a->tickets_tosell--;
printf("agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
}
pthread_exit((void *)&a->agent_id);
}
typedef struct {
/* shared threads pool */
pthread_t *pool_addr;
unsigned pool_size;
/* a pair of producer-consumer semaphore */
sem_t *producer_sem;
sem_t *consumer_sem;
/* producer & consumer thread id */
pthread_t *producer_tid;
pthread_t *consumer_tid;
} threads_pool;
static void new_threads_pool(threads_pool *tp,
pthread_t *pool_addr, unsigned pool_size,
sem_t *producer_sem, sem_t *consumer_sem,
pthread_t *ptid, pthread_t *ctid) {
tp->pool_addr = pool_addr;
tp->pool_size = pool_size;
tp->producer_sem = producer_sem;
tp->consumer_sem = consumer_sem;
tp->producer_tid = ptid;
tp->consumer_tid = ctid;
}
typedef struct {
sem_t *checkin_b;
sem_t *checkout_b;
} barrier;
static void new_barrier(barrier *b, sem_t *inb, sem_t *outb) {
b->checkin_b = inb;
b->checkout_b = outb;
}
typedef struct {
unsigned num_agents;
unsigned num_tickets;
} project;
static void new_project(project *p, unsigned num_agents, unsigned num_tickets) {
p->num_agents = num_agents;
p->num_tickets = num_tickets;
}
typedef struct {
project *pj;
threads_pool *tp;
barrier *b;
} project_params;
static void new_project_params(project_params *pp, project *pj, threads_pool *tp, barrier *b) {
pp->pj = pj;
pp->tp = tp;
pp->b = b;
}
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *checkin(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkout_b);
sem_wait(pp->b->checkin_b);
unsigned tickets_pool = pp->pj->num_tickets; // shared resourses
pthread_mutex_t tickets_pool_lock; // mutex object
agent agents[pp->pj->num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() function return the error code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, pp->pj->num_tickets / pp->pj->num_agents, &tickets_pool_lock, &tickets_pool);
sem_wait(pp->tp->producer_sem);
err = pthread_create(pp->tp->pool_addr + (i % pp->tp->pool_size), NULL, sell_tickets, &agents[i]);
if (err != 0) {
printf("error[%d]: can't create thread.", err);
pthread_cancel(*pp->tp->consumer_tid);
pthread_exit(NULL);
} else {
tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
printf("thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
sem_post(pp->tp->consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(*pp->tp->consumer_tid, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
printf("Today's tickets are sold out!\n");
} else {
printf("%d tickets left at the end of the day!\n", tickets_pool);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *checkout(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkin_b);
sem_wait(pp->b->checkout_b);
void *tret; // to store pthread_exit() exit status rval_ptr
int errcode; // pthread_join() return err code
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
sem_wait(pp->tp->consumer_sem);
errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
sem_post(pp->tp->producer_sem);
printf("agent@%d has finished his job!\n", *(unsigned *)tret);
}
pthread_exit(NULL);
}
/**
* producer-consumer model using semaphore
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
unsigned num_agents = 30;
unsigned num_tickets = 300;
/*
* project
*/
project pj;
new_project(&pj, num_agents, num_tickets);
/*
* thread pool of maximum 10 thread
*/
unsigned threads_pool_size = 10;
pthread_t pool[threads_pool_size];
/*
* checkin() and checkout() threads
*/
pthread_t checkin_tid, checkout_tid;
int checkin_err, checkout_err;
/* a pair of producer-consumer semaphores */
sem_t *checkin_sem, *checkout_sem;
checkin_sem = sem_open("/checkin_sem", O_CREAT, S_IRWXG, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT, S_IRWXG, 0);
/* barrier */
barrier b;
sem_t *checkin_b, *checkout_b;
checkin_b = sem_open("/checkin_b", O_CREAT, S_IRWXG, 0);
checkout_b = sem_open("/checkout_b", O_CREAT, S_IRWXG, 0);
new_barrier(&b, checkin_b, checkout_b);
/* wrap checkin() checkout() parameters into a single structure */
threads_pool tp;
new_threads_pool(&tp, &pool[0], threads_pool_size, checkin_sem, checkout_sem, &checkin_tid, &checkout_tid);
project_params pp;
new_project_params(&pp, &pj, &tp, &b);
/* create checkin(), checkout() threads */
checkin_err = pthread_create(&checkin_tid, NULL, checkin, &pp);
checkout_err = pthread_create(&checkout_tid, NULL, checkout, &pp);
if (checkin_err != 0 || checkout_err != 0) {
if (checkin_err != 0) printf("error: checkin() thread creation failed!\n");
if (checkout_err != 0) printf("error: checkout() thread creation failed!\n");
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(checkin_tid, NULL);
pthread_join(checkout_tid, NULL);
/*
* free resources
*/
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
}
int main(void) {
run();
}
lldb 表示thread_join() 返回错误代码3:ESRCH。
Process 46631 stopped
* thread #3, stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
frame #0: 0x0000000100003b39 agtktp`checkout(params=0x00007ffeefbff618) at agents_tickets_pool.c:221:46
217 errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
218 pthread_t tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
219 printf("thread@%u = %lx\n", i + 1, (unsigned long)tid);
220 sem_post(pp->tp->producer_sem);
-> 221 printf("agent@%d has finished his job!\n", *(unsigned *)tret);
222 }
223 pthread_exit(NULL);
224 }
Target 0: (agtktp) stopped.
(lldb) fr v errcode
(int) errcode = 3
(lldb) fr v tid
(pthread_t) tid = 0x0000000000000050
=== 01/03/2021 更新 2 ===
我已经发布了解决方案。如果有问题,请随时纠正我。谢谢大家,尤其是@Semion。
=== 2021 年 1 月 3 日结束更新 2 ===
=== 01/03/2021 更新 1 ===
目前我已经解决了这个问题,发现我在下面的代码中犯了一个典型的错误。
我需要一些时间来重新编辑这个问题并给出答案,以使其对以后的 POSIX Semaphore 用户有用。请不要关闭这个问题(它已经有 2 个关闭投票)。谢谢。
=== 2021 年 1 月 3 日结束更新 1 ===
【问题讨论】:
-
很高兴您能够使用调试器。但请注意,Stack Overflow 不是调试 服务。您有一个想法,“似乎消费者函数
checkout()在生产者checkin()甚至创建新线程之前调用pthread_join()函数。”。您是否尝试过验证这一事实?是的,多线程应用程序在调试时会遇到一些困难。但是printf()和pause()在特定线程和特定位置仍然可以提供帮助。 -
我们确实不是调试服务,尽管我们有时会回答调试问题。我们通常希望此类问题提供minimal reproducible example,而您的问题则没有。我对坚持使用 MRE 没有任何疑虑,因为构建 MRE 本身就是一种强大的调试技术。
-
感谢@JohnBollinger,我完全尊重 MRE 的价值并继续这样做。考虑到线程实现依赖于系统,我正在使用 mac os,如果你在 Linux 上进行测试,我不能保证它的行为保持不变。即使在我的机器上使用相同的代码,线程顺序总是会改变。这就是为什么并发调试很棘手。但仍然感谢您的建议。
-
当我在 linux 上运行您的代码时,在第二次运行时以及在
sem_post(pp->b->checkout_b);上崩溃之后,因为sem_open("/checkin_b", O_CREAT, S_IRWXG, 0)失败。lsing/dev/shm表示使用----r-x---创建的信号量,所以第一次运行后无法再次打开。更改为S_IRWXU并删除旧信号量为我解决了问题。 -
在 linux 中,信号量似乎也以文件形式存在于 /dev/shm 中,因此
ls -l /dev/shm给出了具有文件权限的它们的列表。很高兴听到你发现了这个错误 :)
标签: c concurrency pthreads ipc semaphore