EventLoop在网络库中的作用?

主线程也有一个线程在跑EventLoop对象的loop()函数,在这个函数内关注服务端的socketfd用来接收新的客户端socketfd连接。

将这个新的socketfd连接放到各个线程中并运行线程的EventLoop对象的loop()来关注已连接socketfd的可读可写事件。

所以EventLoop在网络库中的作用基本就是来关注fd事件执行回调函数的。

 

EventLoop对象的数据成员:

typedef std::vector<Channel*> ChannelList;

//线程是否有进入EventLoop的loop()函数
bool looping_; /* atomic */

//线程是否要退出EventLoop的loop()函数
std::atomic<bool> quit_;

//线程是否在执行激活状态channel通道的回调函数        
bool eventHandling_; /* atomic */

//EventLoop线程做的额外工作 
bool callingPendingFunctors_; /* atomic */

//loop函数循环的次数
int64_t iteration_;

//独一无二的线程标识,能区分不同进程间的不同线程
const pid_t threadId_;

//poll有事件到来返回的时间  
Timestamp pollReturnTime_;

//poll函数的具体调用
std::unique_ptr<Poller> poller_;

//定时器功能的实现
std::unique_ptr<TimerQueue> timerQueue_;

//用于唤醒阻塞在poll函数的线程自己
int wakeupFd_;

//这个channel对象是用wakeupFd_和唤醒后需要执行什么函数构造的
std::unique_ptr<Channel> wakeupChannel_;

//??还未知作用
boost::any context_;

// 返回所有有事件到来的channel通道
ChannelList activeChannels_;

//当前正在执行哪个channel回调函数的channel指针
Channel* currentActiveChannel_;

//互斥锁用来互斥地往pendingFunctors_添加额外任务
mutable MutexLock mutex_;

//EventLoop线程除了执行有事件到来的Channel的回调函数外,也会执行这个vector内的函数(我叫它为额外的工作)
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);

EventLoop有什么功能?

 

1.返回所有文件描述符fd有事件(可读可写错误)的通道channel,并执行channel通道注册的回调函数。这是EventLoop对象所在线程的主要工作。

2.EventLoop对象所处的线程会阻塞在poll函数,利用eventfd唤醒自己来做一些额外的工作(例如:添加/删除一个新定时器任务)

3.利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。

 

一、EventLoop的主要工作是调用激活的Channel的回调函数

流程图:

muduo网络库代码剖析——EventLoop类

可以看到Channel对象只是调用了handleEvent(),最终调用的函数是更上一层类或者说是Channel对象所属类注册给channel的回调函数。

二、EventLoop如何利用eventfd从poll函数中唤醒自己去做额外工作

在EventLoop的构造函数内就会设置wakeupChannel_可读事件激活时要回调的函数EventLoop::handleRead(),并将监听eventfd文件描述符事件。当可读事件到来的时候会调用

void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}

从eventupFd_文件描述符中取走数据,防止不断触发可读事件。

设置到wakeupChannel_可读事件是调用了:

void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

流程图:

muduo网络库代码剖析——EventLoop类

 

Channel类对象wakeupChannel_是如何被添加进监听列表的见:

 

 muduo网络库代码剖析——Channel类

 

muduo网络库代码剖析——Poller类、PollPoller类、EpollPoller类

 

三、利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。

 既然EventLoop对象的主要任务是执行有事件的Channel的回调函数,那么就可以利用timerfd实现定时调用回调函数的功能;

 

muduo网络库代码剖析——TimerId类、Timer类、TimerQueue类

 

EventLoop.cc文件:

 1 #include "muduo/net/EventLoop.h"
 2 
 3 #include "muduo/base/Logging.h"
 4 #include "muduo/base/Mutex.h"
 5 #include "muduo/net/Channel.h"
 6 #include "muduo/net/Poller.h"
 7 #include "muduo/net/SocketsOps.h"
 8 #include "muduo/net/TimerQueue.h"
 9 
10 #include <algorithm>
11 
12 #include <signal.h>
13 #include <sys/eventfd.h>
14 #include <unistd.h>
15 
16 using namespace muduo;
17 using namespace muduo::net;
18 
19 namespace
20 {
21 __thread EventLoop* t_loopInThisThread = 0;
22 
23 const int kPollTimeMs = 10000;
24 
25 int createEventfd()
26 {
27   int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
28   if (evtfd < 0)
29   {
30     LOG_SYSERR << "Failed in eventfd";
31     abort();
32   }
33   return evtfd;
34 }
35 
36 #pragma GCC diagnostic ignored "-Wold-style-cast"
37 class IgnoreSigPipe
38 {
39  public:
40   IgnoreSigPipe()
41   {
42     ::signal(SIGPIPE, SIG_IGN);
43     // LOG_TRACE << "Ignore SIGPIPE";
44   }
45 };
46 #pragma GCC diagnostic error "-Wold-style-cast"
47 
48 IgnoreSigPipe initObj;
49 }  // namespace

知识点1:匿名命名空间

知识点2:static全局变量

全局对象会在main函数之前被先调用,所以一开始IgnoreSigPipe()构造函数就被调用了。

 1 EventLoop::EventLoop()
 2   : looping_(false),
 3     quit_(false),
 4     eventHandling_(false),
 5     callingPendingFunctors_(false),
 6     iteration_(0),
 7     threadId_(CurrentThread::tid()),
 8     poller_(Poller::newDefaultPoller(this)),
 9     timerQueue_(new TimerQueue(this)),
10     wakeupFd_(createEventfd()),
11     wakeupChannel_(new Channel(this, wakeupFd_)),
12     currentActiveChannel_(NULL)
13 {
14   LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
15   if (t_loopInThisThread)
16   {
17     LOG_FATAL << "Another EventLoop " << t_loopInThisThread
18               << " exists in this thread " << threadId_;
19   }
20   else
21   {
22     t_loopInThisThread = this;
23   }
24   wakeupChannel_->setReadCallback(
25       std::bind(&EventLoop::handleRead, this));
26   // we are always reading the wakeupfd
27   wakeupChannel_->enableReading();
28 }
29 
30 EventLoop::~EventLoop()
31 {
32   LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
33             << " destructs in thread " << CurrentThread::tid();
34   wakeupChannel_->disableAll();
35   wakeupChannel_->remove();
36   ::close(wakeupFd_);
37   t_loopInThisThread = NULL;
38 }

构造函数:

当eventfd有可读事件到来以后,EventLoop对象注册了它需要调用的函数:EventLoop::handleRead()

第8行:

Poller* Poller::newDefaultPoller(EventLoop* loop)
{
  if (::getenv("MUDUO_USE_POLL"))
  {
    return new PollPoller(loop);
  }
  else
  {
    return new EPollPoller(loop);
  }
}
依据环境变量选择poll还是epoll系统调用

相关文章: