本文共 5488 字,大约阅读时间需要 18 分钟。
http://www.cnblogs.com/danxi/p/6636095.html
想做一个多线程服务器测试程序,因此参考了github的一些实例,然后自己动手写了类似的代码来加深理解。
目前了解的线程池实现有2种思路:
第一种:
主进程创建一定数量的线程,并将其全部挂起,此时线程状态为idle,并将running态计数为0,等到任务可以执行了,就唤醒线程,此时线程状态为running,计数增加,如果计数达到最大线程数,就再创建一组空闲线程,等待新任务,上一组线程执行完退出,如此交替。
第二种:
采用生成者-消费者模式,主进程作为生成者,创建FIFO队列,在任务队列尾部添加任务,线程池作为消费者在队列头部取走任务执行,这之间有人会提到无锁环形队列,在单生成者单消费者的模式下是有效的,但是线程池肯定是多消费者同时去队列取任务,环形队列会造成挂死。
我的实例采用第二种方式实现,在某些应用场景下,允许一定时间内,任务排队的情况,重复利用已有线程会比较合适。
代码比较占篇幅,因此折叠在下面。
task.h:
1 #ifndef TASK_H 2 #define TASK_H 3 4 #include5 #include
6 7 using std::list; 8 9 struct task {10 void (*function) (void *);11 void *arguments;12 int id;13 };14 15 struct work_queue {16 work_queue(){17 pthread_mutex_init(&queue_lock, NULL);18 pthread_mutex_init(&queue_read_lock, NULL);19 pthread_cond_init(&queue_read_cond, NULL);20 qlen = 0;21 }22 23 ~work_queue() {24 queue.clear();25 pthread_mutex_destroy(&queue_read_lock);26 pthread_mutex_destroy(&queue_lock);27 pthread_cond_destroy(&queue_read_cond);28 }29 30 void push(task *tsk);31 task *pull();32 void post();33 void wait();34 35 private:36 int qlen;37 list< task * > queue;38 pthread_mutex_t queue_lock;39 pthread_mutex_t queue_read_lock;40 pthread_cond_t queue_read_cond;41 };42 43 #endif
task.cpp
#include "task.h"void work_queue::push(task *tsk) { pthread_mutex_lock(&queue_lock); queue.push_back(tsk); qlen++; pthread_cond_signal(&queue_read_cond); pthread_mutex_unlock(&queue_lock);}task* work_queue::pull() { wait(); pthread_mutex_lock(&queue_lock); task* tsk = NULL; if (qlen > 0) { tsk = *(queue.begin()); queue.pop_front(); qlen--; if (qlen > 0) pthread_cond_signal(&queue_read_cond); } pthread_mutex_unlock(&queue_lock); return tsk;}void work_queue::post() { pthread_mutex_lock(&queue_read_lock); pthread_cond_broadcast(&queue_read_cond); pthread_mutex_unlock(&queue_read_lock);}void work_queue::wait() { pthread_mutex_lock(&queue_read_lock); pthread_cond_wait(&queue_read_cond, &queue_read_lock); pthread_mutex_unlock(&queue_read_lock);}
threadpool.h
1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3 4 #include "task.h" 5 #include6 7 using std::vector; 8 9 #define safe_delete(p) if (p) { delete p; p = NULL; }10 11 struct threadpool {12 threadpool(int size) : pool_size(size)13 , thread_list(size, pthread_t(0))14 , queue(NULL)15 , finish(false)16 , ready(0) {17 18 pthread_mutex_init(&pool_lock, NULL);19 }20 21 ~threadpool() {22 thread_list.clear();23 safe_delete(queue);24 pthread_mutex_destroy(&pool_lock);25 }26 27 void init();28 void destroy();29 static void* thread_run(void *tp);30 31 void incr_ready();32 void decr_ready();33 bool close() const;34 35 work_queue *queue;36 37 private:38 int pool_size;39 int ready;40 bool finish;41 pthread_mutex_t pool_lock;42 vector thread_list;43 };44 45 46 #endif
threadpool.cpp
1 /* 2 * threadpool.cpp 3 * 4 * Created on: 2017年3月27日 5 * Author: Administrator 6 */ 7 8 #include "threadpool.h" 9 10 11 void* threadpool::thread_run(void *tp) {12 threadpool *pool = (threadpool *) tp;13 pool->incr_ready();14 15 while(1) {16 task* tsk = pool->queue->pull();17 if (tsk) {18 (tsk->function)(tsk->arguments);19 delete tsk;20 tsk = NULL;21 }22 23 if (pool->close())24 break;25 }26 27 pool->decr_ready();28 29 return NULL;30 }31 32 void threadpool::incr_ready() {33 pthread_mutex_lock(&pool_lock);34 ready++;35 pthread_mutex_unlock(&pool_lock);36 }37 38 void threadpool::decr_ready() {39 pthread_mutex_lock(&pool_lock);40 ready--;41 pthread_mutex_unlock(&pool_lock);42 }43 44 bool threadpool::close() const {45 return finish;46 }47 48 void threadpool::init() {49 queue = new work_queue;50 if (!queue) {51 return;52 }53 54 for(int i; ipost();67 }68 }69 }
main.cpp
1 //============================================================================ 2 // Name : thread_pool.cpp 3 // Author : dancy 4 // Version : 5 // Copyright : Your copyright notice 6 // Description : Hello World in C++, Ansi-style 7 //============================================================================ 8 9 #include10 #include "threadpool.h"11 #include 12 #include 13 #include 14 15 using namespace std;16 17 void job(void *tsk){18 printf("job %-2d working on Thread #%u\n", ((task *)tsk)->id, (int)pthread_self());19 }20 21 task *make_task(void (*func) (void *), int id) {22 task *tsk = new task;23 if (!tsk)24 return NULL;25 26 tsk->function = func;27 tsk->arguments = (void *)tsk;28 tsk->id = id;29 30 return tsk;31 }32 33 int main() {34 threadpool tp(4);35 tp.init();36 37 for(int i=0; i<40; i++) 38 tp.queue->push(make_task(&job, i+1));39 40 tp.destroy();41 printf("all task has completed\n");42 return 0;43 }
以上代码需要在linux下编译,mingw下封装的pthread_t,多了一个void *指针,如果要适配还需要自己再封装一次。