EventLoop在网络库中的作用?
主线程也有一个线程在跑EventLoop对象的loop()函数,在这个函数内关注服务端的socketfd用来接收新的客户端socketfd连接。
将这个新的socketfd连接放到各个线程中并运行线程的EventLoop对象的loop()来关注已连接socketfd的可读可写事件。
所以EventLoop在网络库中的作用基本就是来关注fd事件并执行回调函数的。
EventLoop对象的数据成员:
typedef std::vector<Channel*> ChannelList; //线程是否有进入EventLoop的loop()函数 bool looping_; /* atomic */ //线程是否要退出EventLoop的loop()函数 std::atomic<bool> quit_; //线程是否在执行激活状态channel通道的回调函数 bool eventHandling_; /* atomic */ //EventLoop线程做的额外工作 bool callingPendingFunctors_; /* atomic */ //loop函数循环的次数 int64_t iteration_; //独一无二的线程标识,能区分不同进程间的不同线程 const pid_t threadId_; //poll有事件到来返回的时间 Timestamp pollReturnTime_; //poll函数的具体调用 std::unique_ptr<Poller> poller_; //定时器功能的实现 std::unique_ptr<TimerQueue> timerQueue_; //用于唤醒阻塞在poll函数的线程自己 int wakeupFd_; //这个channel对象是用wakeupFd_和唤醒后需要执行什么函数构造的 std::unique_ptr<Channel> wakeupChannel_; //??还未知作用 boost::any context_; // 返回所有有事件到来的channel通道 ChannelList activeChannels_; //当前正在执行哪个channel回调函数的channel指针 Channel* currentActiveChannel_; //互斥锁用来互斥地往pendingFunctors_添加额外任务 mutable MutexLock mutex_; //EventLoop线程除了执行有事件到来的Channel的回调函数外,也会执行这个vector内的函数(我叫它为额外的工作) std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
EventLoop有什么功能?
1.返回所有文件描述符fd有事件(可读可写错误)的通道channel,并执行channel通道注册的回调函数。这是EventLoop对象所在线程的主要工作。
2.EventLoop对象所处的线程会阻塞在poll函数,利用eventfd唤醒自己来做一些额外的工作(例如:添加/删除一个新定时器任务)
3.利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。
一、EventLoop的主要工作是调用激活的Channel的回调函数
流程图:
可以看到Channel对象只是调用了handleEvent(),最终调用的函数是更上一层类或者说是Channel对象所属类注册给channel的回调函数。
二、EventLoop如何利用eventfd从poll函数中唤醒自己去做额外工作
在EventLoop的构造函数内就会设置wakeupChannel_可读事件激活时要回调的函数EventLoop::handleRead(),并将监听eventfd文件描述符事件。当可读事件到来的时候会调用
void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } }
从eventupFd_文件描述符中取走数据,防止不断触发可读事件。
设置到wakeupChannel_可读事件是调用了:
void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } }
流程图:
Channel类对象wakeupChannel_是如何被添加进监听列表的见:
muduo网络库代码剖析——Poller类、PollPoller类、EpollPoller类
三、利用timerfd(TimerId类、Timer类、TimerQueue类)实现线程定时执行任务的功能。
既然EventLoop对象的主要任务是执行有事件的Channel的回调函数,那么就可以利用timerfd实现定时调用回调函数的功能;
muduo网络库代码剖析——TimerId类、Timer类、TimerQueue类
EventLoop.cc文件:
1 #include "muduo/net/EventLoop.h" 2 3 #include "muduo/base/Logging.h" 4 #include "muduo/base/Mutex.h" 5 #include "muduo/net/Channel.h" 6 #include "muduo/net/Poller.h" 7 #include "muduo/net/SocketsOps.h" 8 #include "muduo/net/TimerQueue.h" 9 10 #include <algorithm> 11 12 #include <signal.h> 13 #include <sys/eventfd.h> 14 #include <unistd.h> 15 16 using namespace muduo; 17 using namespace muduo::net; 18 19 namespace 20 { 21 __thread EventLoop* t_loopInThisThread = 0; 22 23 const int kPollTimeMs = 10000; 24 25 int createEventfd() 26 { 27 int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 28 if (evtfd < 0) 29 { 30 LOG_SYSERR << "Failed in eventfd"; 31 abort(); 32 } 33 return evtfd; 34 } 35 36 #pragma GCC diagnostic ignored "-Wold-style-cast" 37 class IgnoreSigPipe 38 { 39 public: 40 IgnoreSigPipe() 41 { 42 ::signal(SIGPIPE, SIG_IGN); 43 // LOG_TRACE << "Ignore SIGPIPE"; 44 } 45 }; 46 #pragma GCC diagnostic error "-Wold-style-cast" 47 48 IgnoreSigPipe initObj; 49 } // namespace
知识点1:匿名命名空间
知识点2:static全局变量
全局对象会在main函数之前被先调用,所以一开始IgnoreSigPipe()构造函数就被调用了。
1 EventLoop::EventLoop() 2 : looping_(false), 3 quit_(false), 4 eventHandling_(false), 5 callingPendingFunctors_(false), 6 iteration_(0), 7 threadId_(CurrentThread::tid()), 8 poller_(Poller::newDefaultPoller(this)), 9 timerQueue_(new TimerQueue(this)), 10 wakeupFd_(createEventfd()), 11 wakeupChannel_(new Channel(this, wakeupFd_)), 12 currentActiveChannel_(NULL) 13 { 14 LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; 15 if (t_loopInThisThread) 16 { 17 LOG_FATAL << "Another EventLoop " << t_loopInThisThread 18 << " exists in this thread " << threadId_; 19 } 20 else 21 { 22 t_loopInThisThread = this; 23 } 24 wakeupChannel_->setReadCallback( 25 std::bind(&EventLoop::handleRead, this)); 26 // we are always reading the wakeupfd 27 wakeupChannel_->enableReading(); 28 } 29 30 EventLoop::~EventLoop() 31 { 32 LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ 33 << " destructs in thread " << CurrentThread::tid(); 34 wakeupChannel_->disableAll(); 35 wakeupChannel_->remove(); 36 ::close(wakeupFd_); 37 t_loopInThisThread = NULL; 38 }
构造函数:
当eventfd有可读事件到来以后,EventLoop对象注册了它需要调用的函数:EventLoop::handleRead()
第8行:
Poller* Poller::newDefaultPoller(EventLoop* loop) { if (::getenv("MUDUO_USE_POLL")) { return new PollPoller(loop); } else { return new EPollPoller(loop); } }