引言
本文讲解的EventLoopThreadPool类,实际上就是Multiple-Reactor模型
即:
- mainReactor用于listen新的客户端连接
- 接收到的连接,通过RR方式选择并按插在subReactor上,由subReactor处理该连接的读写事件
总结一句话:EventLoopThreadPool的功能是开启若干个IO线程,并让每一个IO线程都处于事件循环状态
使用EventLoopThreadPool
EventLoop loop;
// 创建线程池
EventLoopThreadPool pool(&loop, "");
// 设置线程个数
poll.setThreadNum(100);
// 启动线程池
poll.start();
EventLoopThreadPool.h
class EventLoop;
class EventLoopThread;
class EventLoopThreadPool : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
//构造、析构函数
EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
~EventLoopThreadPool();
//设置subEventLoop个数
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
//开启subEventLoop
void start(const ThreadInitCallback& cb = ThreadInitCallback());
//获取EventLoop的3种方法
EventLoop* getNextLoop(); //RR
EventLoop* getLoopForHash(size_t hashCode);//HashCode
std::vector<EventLoop*> getAllLoops(); //全部获取
bool started() const //是否已经启动
{ return started_; }
const string& name() const //EventLoopThreadPool的name
{ return name_; }
private:
EventLoop* baseLoop_; //mainLoop
string name_;
bool started_;
int numThreads_; //线程数
int next_; //新连接到来,所选择的EventLoop对象下标
std::vector<std::unique_ptr<EventLoopThread>> threads_; //IO线程列表
std::vector<EventLoop*> loops_; //EventLoop列表
};
代码解析
二话不说,先看几个比较关键的函数
class EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
//设置subReactor的个数
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
//创建&&开启subReactor
void start(const ThreadInitCallback& cb = ThreadInitCallback());
//........
}
- 构造函数:初始化成员变量
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
-
start() 启动线程池
(1) 创建numThreads_个线程,每个线程都创建一个事件循环
(2) 每个subEventLoop都开启事件循环,即调用loop()
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
//case 1:numThreads_>0 ⇒ 创建subEventLoop
//创建numThreads_个线程,每个线程都创建一个事件循环
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
//创建事件循环subEventLoop
EventLoopThread* t = new EventLoopThread(cb, buf);
//保存事件循环
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
//开启每个事件循环
loops_.push_back(t->startLoop()); //最终,子线程调用了loop()监听事件
}
//case 2:numThreads_=0 ⇒ 没创建subEventLoop
//EventLoopThreadPool退化成EventLoopThread类,仅有一个mainEventLoop
if (numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}
- 获取EventLoop的3种方法
EventLoop* getNextLoop(); //RR
EventLoop* getLoopForHash(size_t hashCode);//HashCode
std::vector<EventLoop*> getAllLoops(); //全部获取
示例代码
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Thread.h>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
void print(EventLoop* p = NULL)
{
printf("main(): pid = %d, tid = %d, loop = %p\n",
getpid(), CurrentThread::tid(), p);
}
void init(EventLoop* p)
{
printf("init(): pid = %d, tid = %d, loop = %p\n",
getpid(), CurrentThread::tid(), p);
}
int main()
{
print();
{
printf("Single thread %p:\n", &loop);
EventLoopThreadPool model(&loop, "single");
model.setThreadNum(0);
model.start(init);
assert(model.getNextLoop() == &loop);
assert(model.getNextLoop() == &loop);
assert(model.getNextLoop() == &loop);
}
{
printf("Another thread:\n");
EventLoopThreadPool model(&loop, "another");
model.setThreadNum(1);
model.start(init);
EventLoop* nextLoop = model.getNextLoop();
nextLoop->runAfter(2, std::bind(print, nextLoop));
assert(nextLoop != &loop);
assert(nextLoop == model.getNextLoop());
assert(nextLoop == model.getNextLoop());
::sleep(3);
}
{
printf("Three threads:\n");
EventLoopThreadPool model(&loop, "three");
model.setThreadNum(3);
model.start(init);
EventLoop* nextLoop = model.getNextLoop();
//在mainEventLoop的线程中,调用nextLoop的runInLoop函数
//转去调用queueInLoop()、wakeup()、doPendingFunctors()
nextLoop->runInLoop(std::bind(print, nextLoop));
assert(nextLoop != &loop);
assert(nextLoop != model.getNextLoop());
assert(nextLoop != model.getNextLoop());
assert(nextLoop == model.getNextLoop());
}
loop.loop();
}