common.h #ifndef _COMMON_H_ #define _COMMON_H_ #define OK 0 #define ERROR -1 #endif
log.h #ifndef _LOG_H_ #define _LOG_H_ #undef dump #define dump(y,x...) do {} while(0) #endif
queue.h ifndef _QUEUE_H_ #define _QUEUE_H_ #define QUEUE_SIZE 2000 typedef void (*fp) (void *); //初始化队列 void* q_init(size_t size, void *pool); int q_add(void *q, void *func, void *arg); void q_print(void *q, fp p); void print_pointer(void *data); void q_drop(void *q); int q_isempty(void *q); #endif
queue.c #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include "common.h" #include "threadpool.h" #include "utils.h" #include "log.h" #include "queue.h" //队列节点数据结构 typedef struct _Queue_node{ void *func; void *arg; } Q_node; //队列结构 typedef struct _Queue{ Q_node *first; Q_node *last; size_t size; Q_node *head; void *pool; pthread_mutex_t mutex; pthread_cond_t has_data_cond; pthread_cond_t drop_cond; char is_close; pthread_t tid; } Q_t; void q_remove(Q_t *q, void** func, void** arg); int q_isfull(Q_t *q); void* q_thread(void *q); void print_p(void *data); int q_length(Q_t *q); void* q_init(size_t size, void *pool) { ASSERT(size > 0); ASSERT(pool); Q_t *q = (Q_t*)malloc(sizeof(Q_t)); if (NULL == q) return NULL; Q_node *qn = (Q_node*)malloc(size * sizeof(Q_node)); if (NULL == qn) return NULL; q->first = qn; q->last = qn; q->head = qn; q->size = size; q->pool = pool; q->is_close = 0; memset(q->head, 0, size * sizeof(Q_node)); pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->has_data_cond, NULL); pthread_cond_init(&q->drop_cond, NULL); if (0 != pthread_create(&q->tid, NULL, q_thread, q)) { free(q->head); q->head = NULL; free(q); q = NULL; dump(L_ERROR, "Create thread q_thread error"); } return (void*)q; } int q_add(void *qt, void *func, void *arg) { ASSERT(qt); ASSERT(func); /*arg possible NULL*/ Q_t *q = (Q_t*)qt; pthread_mutex_lock(&q->mutex); if (OK == q_isfull(q)) { pthread_mutex_unlock(&q->mutex); return ERROR; } ASSERT(q->last->func == NULL); q->last->func = func; q->last->arg = arg; q->last++; if (q->last == q->first + 1) { ASSERT(1 == q_length(q)); dump(L_DEBUG, "Queue send signal, has data"); pthread_cond_signal(&q->has_data_cond); } if (q->last == q->head + q->size) q->last = q->head; dump(L_DEBUG, "Add Queue Length %d", q_length(q)); pthread_mutex_unlock(&q->mutex); return OK; } int q_isempty(void *qt) { Q_t *q = qt; ASSERT(q); if (q->last == q->first) { if (NULL == q->first->func) { dump(L_INFO, "Queue is empty"); return OK; } } return ERROR; } int q_isfull(Q_t *q) { ASSERT(q); if (q->last == q->first) { if (NULL != q->last->func) { dump(L_INFO, "Queue is full"); return OK; } } return ERROR; } void* q_thread(void *qt) { ASSERT(qt); Q_t *q = qt; void *func; void *arg; while(0 == q->is_close) { pthread_mutex_lock(&q->mutex); if (OK == q_isempty(q)) { pthread_cond_wait(&q->has_data_cond, &q->mutex); } /* possible close signal has_data cond */ if (0 != q->is_close) { pthread_mutex_unlock(&q->mutex); break; } q_remove(q, &func, &arg); pthread_mutex_unlock(&q->mutex); tp_add(q->pool, func, arg); //unlock first then tp add, let q_add go } tp_drop(q->pool); q->pool = NULL; pthread_mutex_lock(&q->mutex); pthread_cond_signal(&q->drop_cond); pthread_mutex_unlock(&q->mutex); pthread_exit(NULL); return NULL; } void q_remove(Q_t *q, void** func, void** arg) { ASSERT(q); ASSERT(q->first->func); ASSERT(func); ASSERT(arg); *func = q->first->func; *arg = q->first->arg; q->first->func = NULL; q->first->arg = NULL; q->first++; if (q->first == q->head + q->size) q->first = q->head; dump(L_DEBUG, "Remove Queue Length %d", q_length(q)); //q_print(q, print_p); } void q_drop(void *qt) { ASSERT(qt); Q_t* q = (Q_t*)qt; pthread_mutex_lock(&q->mutex); q->is_close = 1; /* let q thread not wait */ if (OK == q_isempty(q)) pthread_cond_signal(&q->has_data_cond); pthread_cond_wait(&q->drop_cond, &q->mutex); pthread_join(q->tid, NULL); pthread_mutex_unlock(&q->mutex); pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->has_data_cond); pthread_cond_destroy(&q->drop_cond); free(q->head); q->head = NULL; free(q); } int q_length(Q_t *q) { ASSERT(q); if (q->last < q->first) { return q->size - (q->first - q->last); } else { return q->last - q->first; } } void q_print(void *qt, fp p) { ASSERT(qt); ASSERT(p); Q_t *q = qt; Q_node* qn; int i; for (i = 0, qn = q->head; i < q->size; i++, qn++) { p(qn->func); //p(qn->arg); } } void print_p(void* data) { if (NULL == data) printf("[ ]"); else printf("[x]"); } /* int main() { void* d; Q_t *q = q_init(10,NULL); int z1 = 1; int z2 = 2; int z3 = 3; q_add(q, &z1, NULL); q_add(q, &z2, NULL); q_add(q, &z3, NULL); q_print(q, stdout); return 1; } */
util.h #ifndef _UTILS_H_ #define _UTILS_H_ #ifdef DEBUG #define ASSERT(f) do {if(f) ((void)0); else _Assert(#f, __FILE__, __LINE__);} while(0) #else #define ASSERT(f) ((void)0) #endif #include <sys/time.h> void _Assert(char*, char*, unsigned); #endif
util.c #include <stdio.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <netdb.h> #include <time.h> #include <sys/time.h> #include <unistd.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <stdarg.h> #include <errno.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/wait.h> #include <sys/ioctl.h> #include <net/if.h> #include "common.h" #include "utils.h" #include "log.h" void _Assert (char* name, char* strFile, unsigned uLine) { dump(L_ERROR, "Assertion failed: %s, %s, line %u", name, strFile, uLine); abort(); }
threadpool.h #ifndef _THREADPOOL_ #define _THREADPOOL_ #define MAX_POOL 200 #define INIT_POOL 20 #define STACKSIZE 1048576 typedef void (*func_pointer) (void *); void* tp_init(int max, int init); int tp_add(void *pool, func_pointer fp, void *arg); void* tp_drop(void *pool); void tv_sub(struct timeval *out, const struct timeval *in); #endif
threadpool.c #include <pthread.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <sys/time.h> #include <sys/select.h> #include "threadpool.h" #include "log.h" #include "utils.h" typedef struct _Thread { pthread_t id; pthread_mutex_t mutex; pthread_cond_t cond; func_pointer func; void *arg; void *parent; } Thread_t; typedef struct _Threadpool_t { pthread_mutex_t mutex; pthread_cond_t idle_cond; /*拿到条件 意味着有空线程 可利用 */ pthread_cond_t allidle_cond; /*拿到条件 意味着线程都闲置了 */ pthread_cond_t drop_cond; /*拿到条件 意味着线程都退出了,可删除线程池了 */ Thread_t **thread_list; /*线程数组 */ int thread_idle_num; /*空闲线程个数 */ int thread_hwm_num; /*创建线程的高水位 */ int thread_max_num; int is_close; /*线程池关闭标志 1 是要关闭 */ } Tp_t; void* t_thread(void *arg); int t_idle(Tp_t *pool, Thread_t *thread); void tv_sub(struct timeval *out,const struct timeval *in); void init_func(void* arg); void* tp_init (int max, int init) { int i; ASSERT(max > 0); ASSERT(init >= 0); ASSERT(max >= init); Tp_t *pool = NULL; if (NULL == (pool = (Tp_t*)malloc(sizeof(Tp_t)))) { return NULL; } if (NULL == (pool->thread_list = (Thread_t**)malloc(sizeof(Thread_t*) * max))) { free(pool); pool = NULL; return NULL; } memset(pool->thread_list, 0, sizeof(Thread_t*) * max); pthread_mutex_init(&pool->mutex,NULL); pthread_cond_init(&pool->idle_cond,NULL); pthread_cond_init(&pool->allidle_cond,NULL); pthread_cond_init(&pool->drop_cond,NULL); pool->thread_idle_num = 0; pool->thread_hwm_num = 0; pool->is_close = 0; pool->thread_max_num = max; dump(L_SUCCESS, "create thread pool %d successfully!!", pool->thread_max_num); for (i = 0; i < init; i++) { if (-1 == tp_add(pool, init_func, NULL)) { free(pool->thread_list); pool->thread_list = NULL; free(pool); pool = NULL; return NULL; } } dump(L_SUCCESS, "init thread pool %d/%d successfully!!", pool->thread_hwm_num, pool->thread_max_num); return (void *)pool; } /* execute job and then idle thread, then cond_wait , if is_close ,then return*/ void* t_thread(void *arg) { if (NULL == arg) return NULL; Thread_t *thread =(Thread_t*)arg; Tp_t *pool = thread->parent; /* change while to do while for a thread add, then quickly pool drop the thread possible cant execute once */ do { struct timeval start_time; struct timeval end_time; gettimeofday(&start_time,NULL); if (thread->func) thread->func(thread->arg); gettimeofday(&end_time,NULL); tv_sub(&end_time,&start_time); pthread_mutex_lock(&thread->mutex); if (0 == t_idle(pool,thread)) { pthread_cond_wait(&thread->cond,&thread->mutex); pthread_mutex_unlock(&thread->mutex); } else { pthread_mutex_unlock(&thread->mutex); pthread_cond_destroy(&thread->cond); pthread_mutex_destroy(&thread->mutex); free(thread); break; } } while(0 == pool->is_close); /* pool is close or thread is exeception idle_thread failure*/ pthread_mutex_lock(&pool->mutex); pool->thread_hwm_num--; if (pool->thread_hwm_num <= 0) pthread_cond_signal(&pool->drop_cond); pthread_mutex_unlock(&pool->mutex); return NULL; } /*let this thread idle ,can be use */ int t_idle(Tp_t *pool,Thread_t *thread) { pthread_mutex_lock(&pool->mutex); pool->thread_list[pool->thread_idle_num] = thread; /*空闲的线程指针放回来*/ pool->thread_idle_num++; /*假设有释放后有5个线程,修改的是thread_list[4]*/ dump(L_DEBUG, "One Thread is idle %d %lu", pool->thread_idle_num, pthread_self()); pthread_cond_signal(&pool->idle_cond); /*这个线程可用了,告诉有线程空闲,解决线程不足的等待*/ if (pool->thread_idle_num >= pool->thread_hwm_num) /*如果所有线程都idle*/ pthread_cond_signal(&pool->allidle_cond); pthread_mutex_unlock(&pool->mutex); return 0; } /*往线程池里加任务,参数为任务要执行的函数和参数*/ int tp_add(void *_pool,func_pointer fp,void *arg) { //arg can be NULL if ((NULL == _pool)||(NULL == fp)) { return -1; } Tp_t *pool = (Tp_t*)_pool; Thread_t *thread = NULL; /*可用线程*/ pthread_mutex_lock(&pool->mutex); if (pool->thread_idle_num <= 0) { /*无空闲线程*/ if (pool->thread_hwm_num >= pool->thread_max_num) { /*无法再创建线程了,到达上限了,只能等待到有空闲线程*/ dump(L_FAILURE,"Idle thread is none, begin hang..."); pthread_cond_wait(&pool->idle_cond,&pool->mutex); } else { pthread_attr_t attr; /*创建线程*/ if (NULL == (thread = (Thread_t*)malloc(sizeof(Thread_t)))) { dump(L_FAILURE,"error-----\n"); pthread_mutex_unlock(&pool->mutex); return -1; } memset(thread,0,sizeof(Thread_t)); pthread_mutex_init(&thread->mutex,NULL); pthread_cond_init(&thread->cond,NULL); thread->func = fp; thread->arg = arg; thread->parent = pool; pthread_attr_init(&attr); pthread_attr_setstacksize (&attr, STACKSIZE); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); if (0 == pthread_create(&thread->id,&attr,t_thread,thread)) { pool->thread_hwm_num++; dump(L_DEBUG, "Create thread successfully %d-%d", pool->thread_hwm_num, pool->thread_max_num); pthread_attr_destroy(&attr); pthread_mutex_unlock(&pool->mutex); return 0; /* 这里idle不会增加,只有在idle_thread + 1 */ } else { pthread_mutex_destroy(&thread->mutex); pthread_cond_destroy(&thread->cond); pthread_attr_destroy(&attr); free(thread); pthread_mutex_unlock(&pool->mutex); return -1; } } } /*有空闲线程 1 本来就有 2 等了一会有的*/ /*选择最上面的一个可用线程*/ pool->thread_idle_num--; /*假设有5个空闲线程,那么能使用的就是thread_list[4]*/ thread = pool->thread_list[pool->thread_idle_num]; pool->thread_list[pool->thread_idle_num] = NULL; /*取出后,清空,因为不一定放回这个位置*/ pthread_mutex_lock(&thread->mutex); thread->func = fp; thread->arg = arg; thread->parent = pool; pthread_cond_signal(&thread->cond); /*通知该线程,开始干活*/ pthread_mutex_unlock(&thread->mutex); dump(L_SUCCESS,"pool state : idle thread %d allocate thread %d", pool->thread_idle_num, pool->thread_hwm_num); pthread_mutex_unlock(&pool->mutex); return 0; } /* drop thread pool*/ void* tp_drop(void *_pool) { if (NULL == _pool) { dump(L_FAILURE,"drop threadpool error\n"); return NULL; } int i; Tp_t *pool = (Tp_t *)_pool; Thread_t *thread = NULL; pthread_mutex_lock(&pool->mutex); pool->is_close = 1; /*wait all thread idle, only idle then signal*/ if (pool->thread_hwm_num > pool->thread_idle_num) { dump(L_SUCCESS,"Wait %d thread idle", pool->thread_hwm_num - pool->thread_idle_num); pthread_cond_wait(&pool->allidle_cond, &pool->mutex); } ASSERT(pool->thread_hwm_num == pool->thread_idle_num); /* signal all thread, let thread exit */ for(i = 0; i<pool->thread_hwm_num; i++) { thread = pool->thread_list[i]; pthread_mutex_lock(&thread->mutex); pthread_cond_signal(&thread->cond); pthread_mutex_unlock(&thread->mutex); } /* some thread not exit */ if (pool->thread_hwm_num > 0) { dump(L_SUCCESS,"Wait %d thread empty", pool->thread_hwm_num); pthread_cond_wait(&pool->drop_cond, &pool->mutex); } /* now all thread exit */ ASSERT(pool->thread_hwm_num == 0); for(i = 0; i < pool->thread_idle_num; i++) { free(pool->thread_list[i]); pool->thread_list[i] = NULL; } pthread_mutex_unlock(&pool->mutex); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->idle_cond); pthread_cond_destroy(&pool->allidle_cond); pthread_cond_destroy(&pool->drop_cond); free(pool->thread_list); pool->thread_list = NULL; free(pool); return NULL; } void tv_sub(struct timeval* out,const struct timeval* in) { /*out->tv_usec < in->tv_usec*/ if((out->tv_usec -= in->tv_usec) < 0) { /*negative -> postive 1,000,000 = 1s*/ (out->tv_sec)--; out->tv_usec+=1000000; } out->tv_sec-=in->tv_sec; } /* only use init thread */ void init_func(void* arg) { sleep(2); }
test.c #include <stdio.h> #include <unistd.h> #include <sys/select.h> #include "common.h" #include "threadpool.h" #include "queue.h" #include "log.h" #define DEBUG 1 void f1(void *arg) { printf("call f1\n"); sleep(1); } void f2(void *arg) { printf("call f2\n"); sleep(3); } void f3(void *arg) { printf("call f3\n"); sleep(2); } int main() { void *pool = NULL; void *q = NULL; if (NULL == (pool = tp_init(MAX_POOL, INIT_POOL))) { printf("tp_init error"); return ERROR; } if (NULL == (q = q_init(QUEUE_SIZE, pool))) { printf("q_init error"); return ERROR; } int i = 0; for (i = 0; i < 20; i++) { q_add(q, f1, NULL); q_add(q, f2, NULL); q_add(q, f3, NULL); } while(OK != q_isempty(q)) { sleep(0.1); } q_drop(q); return OK; }