有时我们会需要大量线程来处理一些相互独立的任务,为了避免频繁的申请释放线程所带来的开销,我们可以使用线程池。下面是一个C语言实现的简单的线程池。

头文件:

#ifndef THREAD_POOL_H__
#define THREAD_POOL_H__
   3:  
#include <pthread.h>
   5:  
/* 要执行的任务链表 */
struct tpool_work {
/* 任务函数 */
/* 传入任务函数的参数 */
struct tpool_work   *next;                    
  11: }tpool_work_t;
  12:  
struct tpool {
/* 线程池是否销毁 */
/* 最大线程数 */
/* 线程ID数组 */
/* 线程链表 */
  18:     pthread_mutex_t queue_lock;                    
  19:     pthread_cond_t  queue_ready;    
  20: }tpool_t;
  21:  
/*
 * @brief     创建线程池 
 * @param     max_thr_num 最大线程数
 * @return     0: 成功 其他: 失败  
 */
int
int max_thr_num);
  29:  
/*
 * @brief     销毁线程池 
 */
void
  34: tpool_destroy();
  35:  
/*
 * @brief     向线程池中添加任务
 * @param    routine 任务函数指针
 * @param     arg 任务函数参数
 * @return     0: 成功 其他:失败 
 */
int
void *arg);
  44:  
#endif

实现:

#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
   6:  
   8:  
static tpool_t *tpool = NULL;
  10:  
/* 工作者线程函数, 从任务链表中取出任务并执行 */
void* 
void *arg)
  14: {
  15:     tpool_work_t *work;
  16:     
while(1) {
/* 如果线程池没有被销毁且没有任务要执行,则等待 */
  19:         pthread_mutex_lock(&tpool->queue_lock);
while(!tpool->queue_head && !tpool->shutdown) {
  21:             pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);
  22:         }
if (tpool->shutdown) {
  24:             pthread_mutex_unlock(&tpool->queue_lock);
  25:             pthread_exit(NULL);
  26:         }
  27:         work = tpool->queue_head;
  28:         tpool->queue_head = tpool->queue_head->next;
  29:         pthread_mutex_unlock(&tpool->queue_lock);
  30:  
  31:         work->routine(work->arg);
  32:         free(work);
  33:     }
  34:     
return NULL;   
  36: }
  37:  
/*
 * 创建线程池 
 */
int
int max_thr_num)
  43: {
int i;
  45:  
sizeof(tpool_t));
if (!tpool) {
, __FUNCTION__);
  49:         exit(1);
  50:     }
  51:     
/* 初始化 */
  53:     tpool->max_thr_num = max_thr_num;
  54:     tpool->shutdown = 0;
  55:     tpool->queue_head = NULL;
if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {
,
  58:             __FUNCTION__, errno, strerror(errno));
  59:         exit(1);
  60:     }
if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) {
, 
  63:             __FUNCTION__, errno, strerror(errno));
  64:         exit(1);
  65:     }
  66:     
/* 创建工作者线程 */
sizeof(pthread_t));
if (!tpool->thr_id) {
, __FUNCTION__);
  71:         exit(1);
  72:     }
for (i = 0; i < max_thr_num; ++i) {
if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){
, __FUNCTION__, 
  76:                 errno, strerror(errno));
  77:             exit(1);
  78:         }
  79:         
  80:     }    
  81:  
return 0;
  83: }
  84:  
/* 销毁线程池 */
void
  87: tpool_destroy()
  88: {
int i;
  90:     tpool_work_t *member;
  91:  
if (tpool->shutdown) {
return;
  94:     }
  95:     tpool->shutdown = 1;
  96:  
/* 通知所有正在等待的线程 */
  98:     pthread_mutex_lock(&tpool->queue_lock);
  99:     pthread_cond_broadcast(&tpool->queue_ready);
 100:     pthread_mutex_unlock(&tpool->queue_lock);
for (i = 0; i < tpool->max_thr_num; ++i) {
 102:         pthread_join(tpool->thr_id[i], NULL);
 103:     }
 104:     free(tpool->thr_id);
 105:  
while(tpool->queue_head) {
 107:         member = tpool->queue_head;
 108:         tpool->queue_head = tpool->queue_head->next;
 109:         free(member);
 110:     }
 111:  
 112:     pthread_mutex_destroy(&tpool->queue_lock);    
 113:     pthread_cond_destroy(&tpool->queue_ready);
 114:  
 115:     free(tpool);    
 116: }
 117:  
/* 向线程池添加任务 */
int
void *arg)
 121: {
 122:     tpool_work_t *work, *member;
 123:     
if (!routine){
, __FUNCTION__);
return -1;
 127:     }
 128:     
sizeof(tpool_work_t));
if (!work) {
, __FUNCTION__);
return -1;
 133:     }
 134:     work->routine = routine;
 135:     work->arg = arg;
 136:     work->next = NULL;
 137:  
 138:     pthread_mutex_lock(&tpool->queue_lock);    
 139:     member = tpool->queue_head;
if (!member) {
 141:         tpool->queue_head = work;
else {
while(member->next) {
 144:             member = member->next;
 145:         }
 146:         member->next = work;
 147:     }
/* 通知工作者线程,有新任务添加 */
 149:     pthread_cond_signal(&tpool->queue_ready);
 150:     pthread_mutex_unlock(&tpool->queue_lock);
 151:  
return 0;    
 153: }
 154:     
 155:  

测试代码:

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
   5:  
void *arg)
   7: {
int)arg);
return NULL;
  10: }
  11:  
int
char **argv)
  14: {
if (tpool_create(5) != 0) {
);
  17:         exit(1);
  18:     }
  19:     
int i;
for (i = 0; i < 10; ++i) {
void*)i);
  23:     }
  24:     sleep(2);
  25:     tpool_destroy();
return 0;
  27: }

这个实现是在调用tpool_destroy之后,仅将当前正在执行的任务完成之后就会退出,我们也可以修改代码使得线程池在执行完任务链表中所有任务后再退出。

相关文章: