1.首先我们创建一个任务类。

//回调函数
typedef int(*cal_t)(int,int);

class Task{
  private:
    int x;//操作数1
    int y;//操作数2
    int z;//运算结果
    cal_t handler_task;//运算操作
  public:
    Task(int a,int b,cal_t handler_task_)//构造函数
    :x(a),y(b),handler_task(handler_task_)
    {

    }
   //执行操作
     void Run()
    {
      z = handler_task(x,y);
    }
   //将结果输出
    void Show()
    {
      cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl;
    }

    ~Task()
    {

    }
};


2.线程池类

1)线程池的成员

class  ThreadPool{
   private:
     queue<Task> Task_Queue;//将任务添加在一个队列中
     bool IsStop;//线程池的工作状态
     int ThreadNum;//线程池内的线程数量
     //为了线程安全所以需要互斥锁与条件变量保护临界资源
     pthread_mutex_t lock;
     pthread_cond_t cond;
};

2)根据测试实现线程池内的成员函数

#include"ThreadPool.hpp"

#define NUM 5 //设置线程池的线程数量
//任务操作
int Add(int x,int y)
{
  return x + y;
}

int main()
{
//创建一个线程池类
  ThreadPool *tp =new ThreadPool(NUM);
  //线程池的初始化
  tp -> InitThreadPool();
//为了便于理解,我们使用while循环先一直往任务队列添加任务。
  int count =1;
  while(1){
    sleep(1);
    Task t(count,count-1,Add);
    tp->AddTask(t);
    count++;
  }
  return 0;
}

《1》线程池类的构造函数与析构函数

//构造函数初始化时,将IsStop状态设置为fasle,否则线程池无法工作。
 ThreadPool(int num):ThreadNum(num),IsStop(false)
     {}

《2》线程池的初始化

 void InitThreadPool()
     {
     //对互斥锁以及条件变脸初始化。
       pthread_mutex_init(&lock,NULL);
       pthread_cond_init(&cond,NULL);
       //设置变量控制线程的创建
       int i = 0;
       for(;i<ThreadNum;i++)
       {
          pthread_t tid;
          pthread_create(&tid,NULL,route,(void *)this);
       }
     }

//析构函数完成互斥锁以及条件变量的销毁
~ThreadPool()
     {
          pthread_mutex_destroy(&lock);
          pthread_cond_destroy(&cond);
     }

《3》初始化完后实现线程创建中的route函数

//C++中static函数内无this指针,C++类对线程操作要用static修饰
 static  void *route(void *arg)
     {
        ThreadPool *tp = (ThreadPool*)arg;
        //将线程自己分离出来
        pthread_detach(pthread_self());
        while(1){
        //为了保证线程安全,将任务队列加锁
          tp->LockQueue();
          //当任务队列为空时,线程先进入等待状态
          if(tp->IsEmpty()){
              tp->IdleThread();
          }
          //当队列有任务时,创建一个任务类并得到该任务,也由此得出需要一个AddTask();成员函数。
          Task t = tp->GetTask();
          //取到会解锁
          tp->UnlockQueue();
          //执行任务操作
          t.Run();
          //将任务结果显示
          t.Show();
        }
     }

《4》依次实LockQueue();IsEmpty();IDleThread();GetTask();UnlockQueue();

    //任务队列加锁
     void LockQueue()
     {
       pthread_mutex_lock(&lock);
     }
     //任务解锁解锁
     void UnlockQueue()
     {
       pthread_mutex_unlock(&lock);
     }
     //判断任务是否为空
     bool IsEmpty()
     {
       return Task_Queue.size() == 0 ? true : false;
     }
     //加任务
     void AddTask(Task &t)
     {
      Task_Queue.push(t);
      //当得到一个任务时便传送信号给一个进程
      NoticOneThread();
      UnlockQueue();
     }
     //传送信号给一个进程
      void NoticOneThread()
     {
       pthread_cond_signal(&cond);
     }
        //等待队列
     void IdleThread()
     {
       pthread_cond_wait(&cond,&lock);
     }
    //得到任务
     Task GetTask()
     {
       Task t = Task_Queue.front();
       Task_Queue.pop();
       return t;
     }

《5》暂停分析
貌似我们已经实现了一个加法的简易线程池,首先创建一个线程池对象,然后进行初始化,创建设定数的线程,并且它们都处于等待状态,当有任务时,它们依次被唤醒,执行任务。但是我们测试用的是while(1)假如没有了任务,线程便一直处于了等待状态不会退出,于是我们还要实现一个Stop()成员函数,若停止了广播NoticAllThread()中的线程让他们依次退出,并修改IdleThread()以及AddTask()函数进行IsStop条件判断。

 void Stop()
     {
       LockQueue();
       IsStop = true;
       UnlockQueue();
       while(ThreadNum > 0){
         NoticAllThread();
       }
     }
 void IdleThread()
     {
       if(IsStop){
         UnlockQueue();
         ThreadNum--;
         pthread_exit((void *)0);
         cout<<"pthread "<<pthread_self() <<"quit"<<endl;
         return ;
       }
       pthread_cond_wait(&cond,&lock);
     }
       void AddTask(Task &t)
     {
       if(IsStop)
       {
         UnlockQueue();
         return ;
       }
      Task_Queue.push(t);
      NoticOneThread();
      UnlockQueue();
     }
    
     void NoticAllThread()
     {
       pthread_cond_broadcast(&cond);
     }


3.完整代码以及测试

头文件

#ifndef __THREADPOLL_HPP__
#define __THREADPOLL_HPP__ 

#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>

using namespace std;


typedef int(*cal_t)(int,int);

class Task{
  private:
    int x;
    int y;
    int z;
    cal_t handler_task;
  public:
    Task(int a,int b,cal_t handler_task_)
    :x(a),y(b),handler_task(handler_task_)
    {

    }

    void Run()
    {
      z = handler_task(x,y);
    }

    void Show()
    {
      cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl;
    }

    ~Task()
    {

    }
};

class  ThreadPool{
   private:
     queue<Task> Task_Queue;
     bool IsStop;
     int ThreadNum;
     pthread_mutex_t lock;
     pthread_cond_t cond;



  private:
    static  void *route(void *arg)
     {
        ThreadPool *tp = (ThreadPool*)arg;
        pthread_detach(pthread_self());
        while(1){
          tp->LockQueue();
          if(tp->IsEmpty()){
              tp->IdleThread();
          }
          Task t = tp->GetTask();
          tp->UnlockQueue();
          
          t.Run();
          t.Show();
        }
     }

     void NoticOneThread()
     {
       pthread_cond_signal(&cond);
     }

     void NoticAllThread()
     {
       pthread_cond_broadcast(&cond);
     }
   public:
     ThreadPool(int num):ThreadNum(num),IsStop(false)
     {}

     void InitThreadPool()
     {
       pthread_mutex_init(&lock,NULL);
       pthread_cond_init(&cond,NULL);
       int i = 0;
      
       
       for(;i<ThreadNum;i++)
       {
          pthread_t tid;
          pthread_create(&tid,NULL,route,(void *)this);
       }
     }

     ~ThreadPool()
     {
          pthread_mutex_destroy(&lock);
          pthread_cond_destroy(&cond);
     }
    

     void LockQueue()
     {
       pthread_mutex_lock(&lock);
     }

     void UnlockQueue()
     {
       pthread_mutex_unlock(&lock);
     }
     
     bool IsEmpty()
     {
       return Task_Queue.size() == 0 ? true : false;
     }

     void AddTask(Task &t)
     {
       if(IsStop)
       {
         UnlockQueue();
         return ;
       }
      Task_Queue.push(t);
      NoticOneThread();
      UnlockQueue();
     }
        
     void IdleThread()
     {
       if(IsStop){
         UnlockQueue();
         ThreadNum--;
         pthread_exit((void *)0);
         cout<<"pthread "<<pthread_self() <<"quit"<<endl;
         return ;
       }
       pthread_cond_wait(&cond,&lock);
     }

     Task GetTask()
     {
       Task t = Task_Queue.front();
       Task_Queue.pop();
       return t;
     }

     void Stop()
     {
       LockQueue();
       IsStop = true;
       UnlockQueue();
       while(ThreadNum > 0){
         NoticAllThread();
       }
     }

};



#endif 

测试文件
我们只设置九个任务。

#include"ThreadPool.hpp"

#define NUM 5

int Add(int x,int y)
{
  return x + y;
}

int main()
{
  ThreadPool *tp =new ThreadPool(NUM);
  tp -> InitThreadPool();

  int count =1;
  int u = 0;
  for(;u<1;u++){
    sleep(1);
    Task t(count,count-1,Add);
    tp->AddTask(t);
    count++;
  }
  return 0;
}

结果
在gdb下观察:
1)首先创建5个线程。
实现加法操作的简易线程池
2)5个线程桉序执行任务。
实现加法操作的简易线程池
3)没有任务了线程退出。
实现加法操作的简易线程池

相关文章: