引言

本文讲解的EventLoopThreadPool类,实际上就是Multiple-Reactor模型
即:

  1. mainReactor用于listen新的客户端连接
  2. 接收到的连接,通过RR方式选择并按插在subReactor上,由subReactor处理该连接的读写事件

总结一句话:EventLoopThreadPool的功能是开启若干个IO线程,并让每一个IO线程都处于事件循环状态
muduo_net代码剖析之EventLoopThreadPool

使用EventLoopThreadPool

EventLoop loop;
// 创建线程池
EventLoopThreadPool pool(&loop, "");
// 设置线程个数
poll.setThreadNum(100);
// 启动线程池
poll.start();

EventLoopThreadPool.h

class EventLoop;
class EventLoopThread;

class EventLoopThreadPool : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;
  //构造、析构函数
  EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
  ~EventLoopThreadPool();
  
  //设置subEventLoop个数
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  //开启subEventLoop
  void start(const ThreadInitCallback& cb = ThreadInitCallback());

  //获取EventLoop的3种方法
  EventLoop* getNextLoop(); //RR
  EventLoop* getLoopForHash(size_t hashCode);//HashCode
  std::vector<EventLoop*> getAllLoops(); //全部获取

  bool started() const //是否已经启动
  { return started_; }

  const string& name() const //EventLoopThreadPool的name
  { return name_; }

private:
  EventLoop* baseLoop_; //mainLoop
  string name_;
  bool started_;
  int numThreads_; //线程数
  int next_; //新连接到来,所选择的EventLoop对象下标
  std::vector<std::unique_ptr<EventLoopThread>> threads_; //IO线程列表
  std::vector<EventLoop*> loops_; //EventLoop列表
};

代码解析

二话不说,先看几个比较关键的函数

class EventLoopThreadPool : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
  
  //设置subReactor的个数
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  //创建&&开启subReactor
  void start(const ThreadInitCallback& cb = ThreadInitCallback());
  //........
}
  1. 构造函数:初始化成员变量
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),
    name_(nameArg),
    started_(false),
    numThreads_(0),
    next_(0)
{
}
  1. start() 启动线程池
    (1) 创建numThreads_个线程,每个线程都创建一个事件循环
    (2) 每个subEventLoop都开启事件循环,即调用loop()
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();
  started_ = true;

//case 1:numThreads_>0 ⇒ 创建subEventLoop
  //创建numThreads_个线程,每个线程都创建一个事件循环
  for (int i = 0; i < numThreads_; ++i) 
  {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    //创建事件循环subEventLoop
    EventLoopThread* t = new EventLoopThread(cb, buf); 
    //保存事件循环
    threads_.push_back(std::unique_ptr<EventLoopThread>(t)); 
    //开启每个事件循环
    loops_.push_back(t->startLoop()); //最终,子线程调用了loop()监听事件
  }

//case 2:numThreads_=0 ⇒ 没创建subEventLoop
  //EventLoopThreadPool退化成EventLoopThread类,仅有一个mainEventLoop
  if (numThreads_ == 0 && cb) 
  {
    cb(baseLoop_);
  }
}
  1. 获取EventLoop的3种方法
EventLoop* getNextLoop(); //RR
EventLoop* getLoopForHash(size_t hashCode);//HashCode
std::vector<EventLoop*> getAllLoops(); //全部获取

示例代码

#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Thread.h>

#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

void print(EventLoop* p = NULL)
{
  printf("main(): pid = %d, tid = %d, loop = %p\n",
         getpid(), CurrentThread::tid(), p);
}

void init(EventLoop* p)
{
  printf("init(): pid = %d, tid = %d, loop = %p\n",
         getpid(), CurrentThread::tid(), p);
}

int main()
{
  print();

  {
    printf("Single thread %p:\n", &loop);
    EventLoopThreadPool model(&loop, "single");
    model.setThreadNum(0);
    model.start(init);
    assert(model.getNextLoop() == &loop);
    assert(model.getNextLoop() == &loop);
    assert(model.getNextLoop() == &loop);
  }

  {
    printf("Another thread:\n");
    EventLoopThreadPool model(&loop, "another");
    model.setThreadNum(1);
    model.start(init);
    EventLoop* nextLoop = model.getNextLoop();
    
    nextLoop->runAfter(2, std::bind(print, nextLoop));
    
    assert(nextLoop != &loop);
    assert(nextLoop == model.getNextLoop());
    assert(nextLoop == model.getNextLoop());
    ::sleep(3);
  }

  {
    printf("Three threads:\n");
    EventLoopThreadPool model(&loop, "three");
    model.setThreadNum(3);
    model.start(init);
    EventLoop* nextLoop = model.getNextLoop();
    
    //在mainEventLoop的线程中,调用nextLoop的runInLoop函数
    //转去调用queueInLoop()、wakeup()、doPendingFunctors()
    nextLoop->runInLoop(std::bind(print, nextLoop));
    
    assert(nextLoop != &loop);
    assert(nextLoop != model.getNextLoop());
    assert(nextLoop != model.getNextLoop());
    assert(nextLoop == model.getNextLoop());
  }

  loop.loop();
}

相关文章:

  • 2022-12-23
  • 2021-12-15
  • 2022-12-23
  • 2021-06-16
  • 2021-12-13
  • 2022-12-23
  • 2022-12-23
  • 2021-06-11
猜你喜欢
  • 2021-07-18
  • 2021-12-28
  • 2021-12-23
  • 2021-10-07
  • 2021-06-17
  • 2021-11-29
  • 2021-10-14
相关资源
相似解决方案