转自:https://blog.csdn.net/analogous_love/article/details/53319815

博主的代码是C++写的,本人改成C语言的。

服务器开发——Reactor模式

 

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>  //for htonl() and htons()
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <signal.h>     //for signal()
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <stdbool.h>  // for bool
#include <sys/stat.h> 
#include <sys/syscall.h> //for gettid

#define gettid() syscall(__NR_gettid)

#define WORKER_THREAD_NUM   5
 
#define min(a, b) ((a <= b) ? (a) : (b)) 
 
int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;
 
pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;
pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;
 
pthread_mutex_t g_clientmutex;
 
typedef struct g_listClients_
{
	int fd;
	struct g_listClients_ *next;
}g_listClients;


static g_listClients *head = NULL;

void prog_exit(int signo)
{
    signal(SIGINT, SIG_IGN);//SIG_IGN 忽略信号的处理程序
    //signal(SIGKILL, SIG_IGN);//该信号不能被阻塞、处理或者忽略
    signal(SIGTERM, SIG_IGN);
 
    printf("program recv signal %d  to exit\n" ,signo);
 
    g_bStop = true;
 
	//将g_listenfd 从g_epollfd 事件集合删除
	epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);
 
    //shutdown 可以立即关闭连接,SHUT_RDWR表示同时关闭读和写
	//但是并不会关闭文件描述符,关闭描述符还是需要close函数
	//防止dup或者fork系统调用将fd计数增加情况
    shutdown(g_listenfd, SHUT_RDWR);
	//close系统调用并非立即关闭一个连接,而是将fd的引用计数减1
	//只有当fd的计数引用为0时才真正关闭fd,读和写全部关闭
    close(g_listenfd);
    close(g_epollfd);
 
    pthread_cond_destroy(&g_acceptcond);
    pthread_mutex_destroy(&g_acceptmutex);
    
    pthread_cond_destroy(&g_cond);
    pthread_mutex_destroy(&g_mutex);
 
    pthread_mutex_destroy(&g_clientmutex);
	
	if(head != NULL)
	{
		free(head);
		head = NULL;
	}
	printf("prog_exit !\n");
}
 
bool create_server_listener(const char* ip, short port)
{
	//socket函数第一个参数也经常看到设置为PF_INET,
	//在Unix/Linux系统中,在不同的版本中这两者有微小差别.
	//对于BSD,是AF,对于POSIX是PF.其实值是一样的影响不大
    g_listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (g_listenfd == -1)
        return false;
 
	//SO_REUSEADDR和SO_REUSEPORT两个标志是设置地址和端口复用,
	//如果不设置当我们重启程序时,如果有和客户端通信,然后会经过四次挥手进入TIME_WAIT状态
	//会有2min的MSL,在这2min中端口不能使用
    int on = 1;
    setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));
 
    struct sockaddr_in servaddr;
    bzero(&servaddr, sizeof(servaddr)); 
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (bind(g_listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1)
        return false;
	//自内核版本2.2之后,listen第二个参数表示全连接的个数
	// proc/sys/net/ipv4/tcp_max_syn_backlog 中定义了半连接的个数
    if (listen(g_listenfd, 50) == -1)
        return false;
	//epoll_create 参数用来告诉内核这个监听的数目一共有多大(2.6.27版本内核之后该参数忽略不用)
	//不同于select第一个参数(最大监听的fd+1)
    g_epollfd = epoll_create(10);
    if (g_epollfd == -1)
        return false;
 
    struct epoll_event e;
    memset(&e, 0, sizeof(e));
	//events 表示epoll事件,EPOLLIN 可读事件 EPOLLRDHUP 套接字对端关闭
    e.events = EPOLLIN | EPOLLRDHUP;
    e.data.fd = g_listenfd;
	//往g_epollfd事件表中注册g_listenfd上的可读和关闭事件
    if (epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)
	{
		return false;
	}
      
    return true;
}
 
void release_client(int clientfd)
{
    if (epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
        printf("release client socket failed as call epoll_ctl failed \n");
 
    close(clientfd);
}
 
void* accept_thread_func(void* arg)
{   
    while (!g_bStop)
    {
        //在pthread_cond_wait之前一定要加锁
		//pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx
		//然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立而被唤醒,唤醒后
		//该进程会先锁定pthread_mutex_lock(&mtx);,再读取资源
		pthread_mutex_lock(&g_acceptmutex);
        pthread_cond_wait(&g_acceptcond, &g_acceptmutex);
        //有可能会存在虚假唤醒,要注意
        struct sockaddr_in clientaddr;
        socklen_t addrlen;
        int newfd = accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
        pthread_mutex_unlock(&g_acceptmutex);
        if (newfd == -1)
            continue;
 
        printf("new client connected: [%s] : [%d]\n", inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port));
 
        //将新socket设置为non-blocking
		//F_GETFL 获取权限  F_SETFL设置权限
        int oldflag = fcntl(newfd, F_GETFL, 0);
        int newflag = oldflag | O_NONBLOCK;
        if (fcntl(newfd, F_SETFL, newflag) == -1)
        {
            printf("fcntl error, oldflag = [%d] , newflag = [%d]\n", oldflag ,newflag);
            continue;
        }
 
        struct epoll_event e;
        memset(&e, 0, sizeof(e));
		
		//设置可读事件 关闭连接事件 和边缘触发模式
		//缺省模式是水平触发即当被监控的文件描述符上有可读写事件发生时
		//epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小
		//那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写.
		//边缘触发即当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写
		//如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,
		//它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你
        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
        e.data.fd = newfd;
        if (epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
        {
           printf("epoll_ctl error, fd = %d \n", newfd );
        }
    }
 
    return NULL;
}
 
 
void* worker_thread_func(void* arg)
{   

    while (!g_bStop)
    {
        int clientfd;
        pthread_mutex_lock(&g_clientmutex);
        while (head == NULL) //while循环是防止意外唤醒
             pthread_cond_wait(&g_cond, &g_clientmutex);
		 
        clientfd = head->fd;
        head = head->next;
        pthread_mutex_unlock(&g_clientmutex);
		
		//gettid 获取的是内核中线程ID,而pthread_self 是posix描述的线程ID
		printf("in pthread_self = [%ld] gettid= [%ld]\n", pthread_self(), (long)gettid());
        char buff[256];
		char sendbuf[1024];
		int sendlen = 0;
        bool bError = false;
		int nRecv = 0, count = 0;
        memset(buff, 0, sizeof(buff));
		do
		{
			nRecv = recv(clientfd, buff + count, 256 - count, 0);
			if (nRecv == -1)
			{
				if (errno == EWOULDBLOCK) //EWOULDBLOCK 用于非阻塞模式,不需要重新读或者写 ,EINTR:指操作被中断唤醒,需要重新读/写
					break;
				else
				{
					printf("recv error, client disconnected, fd = [%d]\n", clientfd);
					release_client(clientfd);
					bError = true;
					break;
				}
					
			}
			//对端关闭了socket,这端也关闭。
			else if (nRecv == 0)
			{
				printf("peer closed, client disconnected, fd = %d \n" ,clientfd);
				release_client(clientfd);
				bError = true;
				break;
			}
			count = count + nRecv;
		}while(nRecv > 0 );
         
        //出错了,就不要再继续往下执行了
        if (bError)
            continue;
        
       printf("client msg:[%d] %s\n", count, buff);
 
        //将消息加上时间标签后发回
        time_t now = time(NULL);
        struct tm* nowstr = localtime(&now);

		memset(sendbuf, 0, sizeof(sendbuf));
		sprintf(sendbuf, "%04d-%02d-%02d %02d:%02d:%02d ", nowstr->tm_year + 1900, nowstr->tm_mon + 1, nowstr->tm_mday,	
		nowstr->tm_hour, nowstr->tm_min, nowstr->tm_sec);
        strcat(sendbuf + strlen(sendbuf), "server reply:");
		strcat(sendbuf, buff);
        sendlen  = strlen(sendbuf);
		int sendcount = 0;
        int nSent = 0;
		do
		{
			nSent = send(clientfd , sendbuf + sendcount, sendlen - sendcount, 0);
			if (nSent == -1)
			{
				if (errno == EWOULDBLOCK)
				{
					sleep(10);
					continue;
				}
				else
				{
					printf("send error, fd = [%d] \n", clientfd);
					release_client(clientfd);
					break;
				}   
			}          
			sendcount += nSent;
		}while(sendcount < sendlen);
		
        printf("send [%d]:%s\n", sendcount, sendbuf);
    }
 
    return NULL;
}
 
bool daemon_run()
{
    int pid;
    signal(SIGCHLD, SIG_IGN);
    //创建一个守护进程要遵循以下步骤
	 // 1、fork创建子进程
    pid = fork();
    if (pid < 0)
    {
        printf("fork error \n");
        exit(-1);
    }
    //父进程退出,子进程独立运行
    else if (pid > 0) 
	{
        exit(0);
    }
    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。

	//2、umask设置文件权限掩码
	//设置0表示当前进程创建文件或者目录的最大操作权限为(~0) & mode 即0777&mode, mode为系统调用open的第三个参数
	umask(0);
	
	//3、setsid 创建新回话,设置本进程为进程组的首领
	//执行setsid()之后,child将重新获得一个新的会话(session)id。
    //这时parent退出之后,将不会影响到child了。
	 pid_t sid = setsid();
	if(sid < 0)
	{
		return false;
	}
    //4、切换工作目录
	if((chdir("/")) < 0)
	{
		return false;
	}
	//5、关闭标准输入设备、标准输出设备和标准错误输出设备并重定向到/dev/null文件
	int i;
    for( i = 0; i < 3; ++i)
    {
        close(i);
        open("/dev/null", O_RDWR);
        dup(0);
        dup(0);
    }
	return true;
	//上述功能也可以通过库函数daemon(0, 0)完成
}
 
 
int main(int argc, char* argv[])
{  
    short port = 0;
    int ch;
    bool bdaemon = false;
    while ((ch = getopt(argc, argv, "p:d")) != -1)
    {
        switch (ch)
        {
        case 'd':
            bdaemon = true;
            break;
        case 'p':
            port = atol(optarg);
            break;
        }
    }
 
    if (bdaemon)
        if(!daemon_run())
		{
			printf("daemon run error!\n");
			return -1;
		}
 
    if (port == 0)
        port = 12345;
     
    if (!create_server_listener("0.0.0.0", port))
    {
        printf("Unable to create listen server: ip=0.0.0.0, port= [%d] \n", port);
        return -1;
    }
 
    //设置信号处理
    signal(SIGCHLD, SIG_DFL);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, prog_exit);
    //signal(SIGKILL, prog_exit);//该信号不能被阻塞、处理或者忽略
    signal(SIGTERM, prog_exit);
	
	//pthread_cond_init 和直接赋值为PTHREAD_COND_INITIALIZER效果一样 
    pthread_cond_init(&g_acceptcond, NULL);
	
	//pthread_mutex_init 和直接赋值为PTHREAD_MUTEX_INITIALIZER效果一样 
    pthread_mutex_init(&g_acceptmutex, NULL);
 
    pthread_cond_init(&g_cond, NULL);
    pthread_mutex_init(&g_mutex, NULL);
 
    pthread_mutex_init(&g_clientmutex, NULL);
     
    pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);
    //启动工作线程
    for (int i = 0; i < WORKER_THREAD_NUM; ++i)
    {
        pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);
		printf("creat thread [%ld]\n", g_threadid[i]);
    }
 
    while (!g_bStop)
    {       
        struct epoll_event ev[1024];
		//1024为最大监听描述符的个数,返回值为就绪的描述符的个数,就绪的文件描述符存在ev数组中
        int n = epoll_wait(g_epollfd, ev, 1024, 10);
        if (n == 0)//没有就绪事件
            continue;
        else if (n < 0)//出错
        {
            printf("epoll_wait error \n");
            continue;
        }
 
        for (int i = 0; i < n; ++i)
        {
            //通知接收连接线程接收新连接
            if (ev[i].data.fd == g_listenfd)
				//pthread_cond_signal不会有“惊群现象”产生,他最多只给一个线程发信号
				//假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行
				//如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号
                pthread_cond_signal(&g_acceptcond);
            //通知普通工作线程接收数据
            else
            {   
				g_listClients new_node;           
                pthread_mutex_lock(&g_clientmutex);              
                //g_listClients.push_back(ev[i].data.fd);
				if(head == NULL) //头插入链表可能会导致先插入的fd不能及时处理
				{
					head = (g_listClients *)malloc(sizeof(g_listClients));
					if(head == NULL)
					{
						printf("malloc error!\n");
						pthread_mutex_unlock(&g_clientmutex);
						break;
					}
					head->fd = ev[i].data.fd;
					head->next = NULL;
				}
				else 
				{
					new_node.fd = ev[i].data.fd;
					new_node.next = head;
					head = &new_node;
				}
                pthread_mutex_unlock(&g_clientmutex);
                pthread_cond_signal(&g_cond);
            }
                
        }
 
    }
    free(head);
	head = NULL;
    return 0;
}
  

 

相关文章:

  • 2022-01-03
  • 2021-12-19
  • 2022-01-19
  • 2022-01-08
  • 2022-01-14
猜你喜欢
  • 2021-06-17
  • 2022-12-23
  • 2021-05-19
  • 2021-05-20
  • 2021-09-23
  • 2021-05-22
相关资源
相似解决方案