shutdown = 1;//开始自爆pthread_cond_broadcast(//诱杀。用于锁住队列 高并发服务器04:线程池【1个锁、2个条件变量(一个用于阻塞“取( 二 )。" />

用于锁住队列 高并发服务器04:线程池【1个锁、2个条件变量(一个用于阻塞“取( 二 )

< thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);}//摧毁线程池void destroy_threadpool(ThreadPool *pool){pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);}//添加任务到线程池void addtask(ThreadPool *pool){//printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num;//printf("add task %dtasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];pool->tasks[taskpos].task_func = taskRun;pool->job_num++;pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工//printf("end call %s-----\n",__FUNCTION__);}//任务回调函数void taskRun(void *arg){PoolTask *task = (PoolTask*)arg;int num = task->tasknum;printf("task %d is runing %lu\n",num,pthread_self());sleep(1);printf("task %d is done %lu\n",num,pthread_self());}int main(){create_threadpool(3,20);int i = 0;for(i = 0;i < 50 ; i++){addtask(thrPool);//模拟添加任务}sleep(20);destroy_threadpool(thrPool);return 0;}
三、线程池代码【复杂版】
.h
#ifndef __THREADPOOL_H_#define __THREADPOOL_H_typedef struct threadpool_t threadpool_t;/*** @function threadpool_create* @descCreates a threadpool_t object.* @param thr_numthread num* @param max_thr_nummax thread size* @param queue_max_sizesize of the queue.* @return a newly created thread pool or NULL*/threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);/*** @function threadpool_add* @desc add a new task in the queue of a thread pool* @param poolThread pool to which add the task.* @param function Pointer to the function that will perform the task.* @param argument Argument to be passed to the function.* @return 0 if all goes well,else -1*/int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);/*** @function threadpool_destroy* @desc Stops and destroys a thread pool.* @param poolThread pool to destroy.* @return 0 if destory success else -1*/int threadpool_destroy(threadpool_t *pool);/*** @desc get the thread num* @pool pool threadpool* @return # of the thread*/int threadpool_all_threadnum(threadpool_t *pool);/*** desc get the busy thread num* @param pool threadpool* return # of the busy thread*/int threadpool_busy_threadnum(threadpool_t *pool);#endif
.c
#include #include 【用于锁住队列高并发服务器04:线程池【1个锁、2个条件变量(一个用于阻塞“取】#include #include #include #include #include #include #include "threadpool.h"#define DEFAULT_TIME 10/*10s检测一次*/#define MIN_WAIT_TASK_NUM 10/*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ #define DEFAULT_THREAD_VARY 10/*每次创建和销毁线程的个数*/#define true 1#define false 0typedef struct {void *(*function)(void *);/* 函数指针,回调函数 */void *arg;/* 上面函数的参数 */} threadpool_task_t;/* 各子线程任务结构体 *//* 描述线程池相关信息 */struct threadpool_t {pthread_mutex_t lock;/* 用于锁住本结构体 */pthread_mutex_t thread_counter;/* 记录忙状态线程个数de琐 -- busy_thr_num */pthread_cond_t queue_not_full;/* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */pthread_cond_t queue_not_empty;/* 任务队列里不为空时,通知等待任务的线程 */pthread_t *threads;/* 存放线程池中每个线程的tid 。数组 */pthread_t adjust_tid;/* 存管理线程tid */threadpool_task_t *task_queue;/* 任务队列(数组首地址) */int min_thr_num;/* 线程池最小线程数 */int max_thr_num;/* 线程池最大线程数 */int live_thr_num;/* 当前存活线程个数 */int busy_thr_num;/* 忙状态线程个数 */int wait_exit_thr_num;/* 要销毁的线程个数 */int queue_front;/* task_queue队头下标 */int queue_rear;/* task_queue队尾下标 */int queue_size;/* task_queue队中实际任务数 */int queue_max_size;/* task_queue队列可容纳任务数上限 */int shutdown;/* 标志位,线程池使用状态,true或false */};void *threadpool_thread(void *threadpool);void *adjust_thread(void *threadpool);int is_thread_alive(pthread_t tid);int threadpool_free(threadpool_t *pool);//threadpool_create(3,100,100);threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){int i;threadpool_t *pool = NULL;do {if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {printf("malloc threadpool fail");break;/*跳出do while*/}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;/* 活着的线程数 初值=最小线程数 */pool->wait_exit_thr_num = 0;pool->queue_size = 0;/* 有0个产品 */pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;/* 不关闭线程池 *//* 根据最大线程上限数,给工作线程数组开辟空间, 并清零 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) {printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);/* 队列开辟空间 */pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);if (pool->task_queue == NULL) {printf("malloc task_queue fail\n");break;}/* 初始化互斥琐、条件变量 */if (pthread_mutex_init(&(pool->lock), NULL) != 0|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init the lock or cond fail\n");break;}//启动工作线程pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for (i = 0; i