1 //线程池 2 3 #include <stdio.h> 4 #include <string.h> 5 #include <unistd.h> 6 #include <sys/types.h> 7 #include <pthread.h> 8 #include <assert.h> 9 #include <stdlib.h> 10 //任务链表 11 typedef struct worker 12 { 13 void(*process)(void *arg); 14 void *arg;//回调函数的参数 15 struct worker *next; 16 }CThread_worker; 17 18 19 //线程池结构 20 typedef struct 21 { 22 pthread_mutex_t queue_lock; 23 pthread_cond_t queue_ready; 24 25 //链表结构 线程池中所有的等到任务 26 CThread_worker *queue_head; 27 28 //是否销毁线程池 29 int shutdown; 30 pthread_t *threadid; 31 //最大活动线程数目 32 int max_pthread_num; 33 //当前等待队列的任务数目 34 int cur_queue_size; 35 }CThread_pool; 36 37 38 39 40 int pool_add_worker(void *(*process)(void *arg), void *arg); 41 void *thread_routine(void *arg); 42 43 44 static CThread_pool *pool = NULL; 45 46 //初始化线程池 47 void 48 pool_init(int max_pthread_num) 49 { 50 pool = (CThread_pool *)malloc(sizeof(CThread_pool)); 51 52 //互斥锁的初始化 53 // int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); 54 // pthread_mutex_init() 函数是以动态方式创建互斥锁的,参数attr指定了新建互斥锁的属性。如果参数attr为空,则使用默认的互斥锁属性, 55 //默认属性为快速互斥锁 。互斥锁的属性在创建锁的时候指定. 56 pthread_mutex_init(&(pool->queue_lock),NULL); 57 58 //初始化一个条件变量 59 //extern int pthread_cond_init __P ((pthread_cond_t *__cond,__const pthread_condattr_t *__cond_attr)); 60 //cond是一个指向结构pthread_cond_t的指针 cond_attr是一个指向结构pthread_condattr_t的指针 61 // 结构pthread_condattr_t是条件变量的属性结构,和互斥锁一样我们可以用它来设置条件变量是进程内可用还是进程间可用, 62 //默认值是PTHREAD_ PROCESS_PRIVATE,即此条件变量被同一进程内的各个线程使用; 63 //如果选择为PTHREAD_PROCESS_SHARED则为多个进程间各线程公用。注意初始化条件变量只有未被使用时才能重新初始化或被释放。 64 65 pthread_cond_init(&(pool->queue_ready),NULL); 66 67 68 pool->queue_head = NULL; 69 70 pool->max_pthread_num = max_pthread_num; 71 72 pool->cur_queue_size = 0; 73 74 pool->shutdown = 0; 75 76 pool->threadid = (pthread_t *)malloc(max_pthread_num * sizeof(pthread_t)); 77 78 int i = 0; 79 //创建线程池 80 for( i = 0 ; i < max_pthread_num; i++) 81 { 82 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); 83 } 84 } 85 86 //向线程池加一个任务 87 int 88 pool_add_worker(void *(*process)(void *arg), void *arg) 89 { 90 CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker)); 91 92 newworker->process = process; 93 newworker->arg = arg; 94 newworker->next = NULL; 95 96 pthread_mutex_lock(&(pool->queue_lock)); 97 98 //将任务加入到等待队列中 99 CThread_worker *member = pool->queue_head; 100 101 if( member != NULL) 102 { 103 while(member->next != NULL) 104 { 105 member = member->next; 106 } 107 member->next = newworker; 108 } 109 else 110 { 111 pool->queue_head = newworker; 112 } 113 114 //assert(pool->queue_ready != NULL); 115 116 pool->cur_queue_size ++; 117 118 pthread_mutex_unlock(&(pool->queue_lock)); 119 120 //等待队列有任务,唤醒一个等待线程 121 // pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程, 122 //使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。 123 //pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个; 124 pthread_cond_signal(&(pool->queue_ready)); 125 126 return 0; 127 } 128 129 //销毁线程池 等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完再退出 130 131 int 132 pool_destroy() 133 { 134 if( pool->shutdown) 135 return -1; 136 pool->shutdown = 1; 137 138 //唤醒所有的等待线程,线程池要销毁 139 //如线程池,pthread_cond_broadcast唤醒全部线程 140 pthread_cond_broadcast(&(pool->queue_ready)); 141 142 //阻塞等待线程退出 否则就成僵尸线成 143 144 int i; 145 146 for( i = 0 ; i < pool->max_pthread_num; i++) 147 { 148 pthread_join(pool->threadid[i],NULL); 149 } 150 free(pool->threadid); 151 152 153 //销毁等待的线程 154 CThread_worker *head = NULL; 155 while(pool->queue_head != NULL) 156 { 157 head = pool->queue_head; 158 pool->queue_head = pool->queue_head->next; 159 free(head); 160 } 161 162 //条件变量和互辆 163 pthread_mutex_destroy(&(pool->queue_lock)); 164 pthread_cond_destroy(&(pool->queue_ready)); 165 166 free(pool); 167 168 pool = NULL; 169 return 0; 170 } 171 172 void * 173 thread_routine(void *arg) 174 { 175 printf("strarting thread 0x%x\n",pthread_self()); 176 177 while(1) 178 { 179 pthread_mutex_lock(&(pool->queue_lock)); 180 181 //如果等待队列为0,不销毁线程池 则处于阻塞状态 182 while( pool->cur_queue_size == 0 && !pool->shutdown) 183 { 184 printf("thread 0x%x is waiting \n",pthread_self()); 185 // int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) 186 //待条件有两种方式:无条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(), 187 //其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEOUT,结束等待 188 //无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。// 189 //mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()), 190 //而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。 191 //在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。 192 pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock)); 193 194 } 195 196 197 //线程池要销毁 198 if( pool->shutdown) 199 { 200 pthread_mutex_unlock(&(pool->queue_lock)); 201 printf("thread 0x%x will exit\n",pthread_self()); 202 pthread_exit(NULL); 203 } 204 205 printf("thread 0x%x is strarting to work \n",pthread_self()); 206 207 assert(pool->cur_queue_size != 0); 208 assert(pool->queue_head != NULL); 209 210 pool->cur_queue_size --; 211 CThread_worker *worker = pool->queue_head; 212 213 pool->queue_head = worker->next; 214 pthread_mutex_unlock(&(pool->queue_lock)); 215 216 //调用回调函数 217 (*(worker->process))(worker->arg); 218 free(worker); 219 worker = NULL; 220 } 221 pthread_exit(NULL); 222 } 223 224 225 void * 226 myprocess(void *arg) 227 { 228 printf("thread is 0x%x , working on task %d\n",pthread_self(),*(int *)arg); 229 sleep(1); 230 return NULL; 231 } 232 233 234 int 235 main(int argc, char *argv[]) 236 { 237 pool_init(3); 238 int *workingnum = (int *)malloc(sizeof(int) *10); 239 240 int i; 241 for( i =0; i < 10; i++) 242 { 243 workingnum[i] = i; 244 pool_add_worker(myprocess,&workingnum[i]); 245 } 246 247 sleep(5); 248 pool_destroy(); 249 free(workingnum); 250 return 0; 251 }
相关文章: