IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程

通俗理解(摘自网上一大神)

这些名词比较绕口,理解涵义就好。一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去给他倒一杯,然后随他去吧,突然又一个要倒酒,你又过去倒上,就这样一个服务员服务好多人,有时没人喝酒,服务员处于空闲状态,可以干点别的玩玩手机。至于epoll与select,poll的区别在于后两者的场景中醉汉不说话,你要挨个问要不要酒,没时间玩手机了。io多路复用大概就是指这几个醉汉共用一个服务员。

三个函数

1、select

进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

当我们调用select()时:

  1 上下文切换转换为内核态

  2 将fd从用户空间复制到内核空间

  3  内核遍历所有fd,查看其对应事件是否发生

  4  如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历

  5 返回遍历后的fd

  6  将fd从内核空间复制到用户空间

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
 
参数: 可接受四个参数(前三个必须)
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition”
timeout: 超时时间

返回值:三个列表
 
select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
   当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

在服务端我们可以看到,我们需要不停的调用select, 这就意味着:

  1  当文件描述符过多时,文件描述符在用户空间与内核空间进行copy会很费时

  2  当文件描述符过多时,内核对文件描述符的遍历也很浪费时间

  3  select最大仅仅支持1024个文件描述符

参考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html

2、poll

参考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html

3、epoll

参考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html

epoll是select和poll改进后的结果,相比下epoll具有以下优点:

1、支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)

select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024

2、I/O效率不会随着FD数目的增加而线性下降

 epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时,会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次

传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会。在这点上,epoll实现了一个伪AIO

3、使用mmap加速内核与用户空间的消息传递

epoll会在epoll_ctl时把指定的fd遍历一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd

无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap使用同一块内存实现。

4、epoll的API更加简单

用来克服select/poll缺点的方法不只有epoll,epoll只是一种Linux的实现方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用难度依次递增。但epoll更加简单。

epoll详解(python中)

 Python中的select模块专注于I/O多路复用,提供了select  poll  epoll三个方法(其中后两个在Linux中可用,windows仅支持select),另外也提供了kqueue方法(freeBSD系统)

select.epoll(sizehint=-1, flags=0) 创建epoll对象

epoll.close()
Close the control file descriptor of the epoll object.关闭epoll对象的文件描述符

epoll.closed
True if the epoll object is closed.检测epoll对象是否关闭

epoll.fileno()
Return the file descriptor number of the control fd.返回epoll对象的文件描述符

epoll.fromfd(fd)
Create an epoll object from a given file descriptor.根据指定的fd创建epoll对象

epoll.register(fd[, eventmask])
Register a fd descriptor with the epoll object.向epoll对象中注册fd和对应的事件

epoll.modify(fd, eventmask)
Modify a registered file descriptor.修改fd的事件

epoll.unregister(fd)
Remove a registered file descriptor from the epoll object.取消注册

epoll.poll(timeout=-1, maxevents=-1)
Wait for events. timeout in seconds (float)阻塞,直到注册的fd事件发生,会返回一个dict,格式为:{(fd1,event1),(fd2,event2),……(fdn,eventn)}

事件:

    EPOLLERR = 8               ----发生错误
    EPOLLET = 2147483648       ----默认为水平触发,设置该事件后则边缘触发
    EPOLLHUP = 16              ----挂起状态
    EPOLLIN = 1                ----可读
    EPOLLMSG = 1024            ----忽略
    EPOLLONESHOT = 1073741824  ----一次性行为。在退出一个事件后,FD内部禁用
    EPOLLOUT = 4               ----可写
    EPOLLPRI = 2               ----紧急可读
    EPOLLRDBAND = 128          ----读取优先
    EPOLLRDNORM = 64           ----相当于epollin
    EPOLLWRBAND = 512          ----写入优先
    EPOLLWRNORM = 256          ----相当于epollout

水平触发和边缘触发:

Level_triggered(水平触发,有时也称条件触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠

Edge_triggered(边缘触发,有时也称状态触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!缺点:某些条件下不可靠

#!/usr/bin/python
# coding:utf-8

import select,socket
import time

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

sk = socket.socket()
sk.bind(('192.168.110.100',8080))
sk.listen(5)
sk.setblocking(0)   #设置非阻塞模式

epoll = select.epoll() #建立一个epoll对象
epoll.register(sk.fileno(),select.EPOLLIN) #监听sk文件描述符的读事件(连接过来,产生读事件)
try:
    connections = {}; requests = {}; responses = {}
    while True:
        events = epoll.poll()     #关注是否有关心的事发生
        for fileno,event in events:  # 返回的events是一个(fileno, event code)tuple列表. fileno是文件描述符, 是一个整型数.
            if fileno == sk.fileno(): #如果是服务器socket事件(即有新连接),建立一个新 连接
                connection, address = sk.accept() #建立的新连接
                connection.setblocking(0) #设置socket为非阻塞模式. 
                epoll.register(connection.fileno(), select.EPOLLIN) # 注册socket的read(EPOLLIN)事件
                connections[connection.fileno()] = connection # 保存文件描述符
                requests[connection.fileno()] = b''           #发送过来的内容
                responses[connection.fileno()] = response     # 要发送的内容
                
            elif event & select.EPOLLIN:   #如果读事件发生
                requests[fileno] += connections[fileno].recv(1024)# 从客户端读取信息
                if EOL1 in requests[fileno] or EOL2 in requests[fileno]: #表示信息接收完毕,结束标志
                    epoll.modify(fileno, select.EPOLLOUT)#一旦完整的http请求接收到,取消注册读取事件,注册写入事件(EPOLLOUT), 写入事件在能够发送数据回客户端的时候产生
                    print('-'*40 + '\n' + requests[fileno].decode()[:-2])
        
            elif event & select.EPOLLOUT: #如果写入事件发生在一个客户端socket上面, 我们就可以发送新数据到客户端了. 
                byteswritten = connections[fileno].send(responses[fileno]) #发送数据到客户端,并返回发送的字节个数
                responses[fileno] = responses[fileno][byteswritten:]   #对字符串进行切片操作,如果完全切,表面发送完毕
                if len(responses[fileno]) == 0: #表明数据发送完毕
                    epoll.modify(fileno, 0)     #一旦所有的返回数据都发送完, 取消监听读取和写入事件. 
                    connections[fileno].shutdown(socket.SHUT_RDWR)
                        
            elif event & select.EPOLLHUP:   #表示客户端断开连接
                epoll.unregister(fileno)    #取消注册
                connections[fileno].close() #断开连接.
                del connections[fileno]     #销毁对象
                
finally:
    epoll.unregister(sk.fileno())
    epoll.close()
    serversocket.close()
            
服务端
#!/usr/bin/python
# coding:utf-8

import socket
obj = socket.socket()   
obj.connect(('192.168.110.100',8080))
obj.sendall('hellob\n\r\n')
print obj.recv(1024)
obj.close()
客户端

 

实战代码:

# /usr/bin/python
# coding:utf-8

import select
import socket
import sys
import Queue
import time
import threading
import logging
import datetime
import re, os
import hashlib

sys.path.append('../')

import multiprocessing

from SQLdb import SQLdb
from mylog import MyLog as Log

from communication_packet import Communication_Packet, Communication_Packet_Flags, Error_Info_Flags
from encryption import PrpCrypt
import pdb

'''
Constant Meaning
EPOLLIN Available for read
EPOLLOUT Available for write
EPOLLPRI Urgent data for read
EPOLLERR Error condition happened on the assoc. fd
EPOLLHUP Hang up happened on the assoc. fd
EPOLLET Set Edge Trigger behavior, the default is Level Trigger behavior
EPOLLONESHOT Set one-shot behavior. After one event is pulled out, the fd is internally disabled
EPOLLRDNORM Equivalent to EPOLLIN
EPOLLRDBAND Priority data band can be read.
EPOLLWRNORM Equivalent to EPOLLOUT
EPOLLWRBAND Priority data may be written.
EPOLLMSG Ignored.

'''


class Server(object):
    def __init__(self, server_IP=None, server_port=None):
        # def __init__(self,server_address = ('112.33.9.154',11366)):
        '''
        初始化服务器一些全局数据
        '''
        # pdb.set_trace()

        # 使用默认模式:debug模式
        self.log = Log()
        self.log.openConsole()  # 打开控制端输出
        self.logger = self.log.getLog()
        self.dbname = 'sync_test'

        # Defaut, we use local host IP and port:11366
        if server_IP is None or server_port is None:
            if server_IP is None:
                try:
                    self.server_IP = self.getlocalIP()
                    self.logger.info('Current server_IP:    %s' % self.server_IP)
                except:
                    self.logger.critical('Get server IP Error!')
                    raise

            if server_port is None:
                self.server_port = 11366
        else:
            self.server_IP = server_IP
            self.server_port = server_port

        self.server_address = (self.server_IP, self.server_port)  # 设置server地址
        self.ListenNum = 100  # 设置最大监控soket连接数
        self.connections = {}  # 记录当前连接
        self.requests = {}  # 记录当前连接的请求数据
        self.addresses = {}  # 记录客户端地址
        self.errorInfo = {}  # 记录错误信息,如果出错则把错误信息返回给客户端
        self.responseInfo = {}
        self.readthreadRecord = {}

        self.lock = threading.Lock()  # 构造线程锁用于数据同步
        self.db = SQLdb()  # 初始化数据库用于数据库同步
        self.setDB('localhost', 'root', '123456', 'sync_test')

        self.readthreadlock = threading.Lock()

        self.EOF = '\n\r\n'
        self.servernum = 'serverxxxxx'
        self.key = '91keytest'
        # set communication user id
        Communication_Packet.set_userid(self.servernum)
        self.encryption = PrpCrypt()
        pass

    def setServerAddree(self, server_ip, server_port):
        '''
        Set server address
        '''
        self.server_address = (server_ip, server_port)  # 设置server地址

    def setDB(self, host=None, username=None, password=None, dbname=None):
        self.db = SQLdb(host, username, password, dbname)

    def getlocalIP(self):
        '''
        获取第一块网卡i做为绑定IP
        '''
        try:
            s = os.popen('ifconfig').read()
        except:
            raise
        else:
            ip = re.findall('inet addr:(?<![\.\d])(?:\d{1,3}\.){3}\d{1,3}(?![\.\d])', s)[0].split(':')[1]
        return ip

    def __init_server(self):
        '''
        初始化server以及epoll监控对象
        '''
        try:
            # pdb.set_trace()
            # Create a TCP/IP socket
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            # 设置socket为非阻塞状态
            self.server.setblocking(0)

            # Bind the socket to the port
            self.logger.info('starting up on %s port %s' % self.server_address)
            try:
                self.server.bind(self.server_address)
            except:
                echo = os.popen('''lsof -i :11366 |grep "(LISTEN)" | awk '{printf($1)}' ''').read()
                print '端口%s被%s进程占用!' % (self.server_port, echo)
                self.logger.error('Bind on %s port %s Error!' % self.server_address)
                raise

            # Listen for incoming connections
            # self.server.listen(self.ListenNum)
            self.server.listen(1)

            # Set up the epoll
            self.epoll = select.epoll()
            self.epoll.register(self.server.fileno(), select.EPOLLIN | select.EPOLLET)  # 为server注册读事件,并设置其为边缘出发模式
        except:
            raise Exception('__init_server Error')

    def __jionthreads(self):
        '''
        join the threading
        '''
        # self.logger.debug('Current threadtast is  %d ' % len(threading.enumerate()))
        main_thread = threading.currentThread()
        for t in threading.enumerate():
            if t is main_thread:
                continue
            else:
                t.join(0)  # 非阻塞join

                # self.logger.debug('After joined the threads... %d '% len(threading.enumerate()))

    def __format_str(self, businessFlag, data, endFlag=True, errorFlag=False, hasnewconf=False, versioninfo=''):
        '''
        格式化发送数据
        '''
        formatstr = {'BUSINESS_TYPE': businessFlag, 'DATA': data, 'ENDFLAG': endFlag, 'ERRORFLAG': errorFlag,
                     'HASNWECONF': hasnewconf, 'VERSIONINFO': versioninfo}
        return str(formatstr) + self.EOF

    def get_table_filed(self, table_name, db, lock, db_name):
        # pdb.set_trace()
        # from the db get the table field!
        query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and TABLE_SCHEMA = '%s';" % (
        table_name, db_name)
        with self.lock:
            detection_version_fields = self.db.fechdb(query_detection_version_field_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            # Record Error and end task
            DB_ERROR = False
            self.logger.error('----Get %s fileds Error! End this task-----' % table_name)
            return
        else:
            # query result is Unicode,so we need to encode to utf-8
            table_field = [field[0].encode('utf-8') for field in detection_version_fields]
            return table_field

    def calc_md5(self, data):
        return hashlib.md5(data).hexdigest()

    def validating_message_token(self, receving_data):
        # print receving_data
        # pdb.set_trace()
        # print receving_data
        print len(receving_data)
        pre_md5 = receving_data[:16]
        suffix_md5 = receving_data[-16:]
        message_md5 = pre_md5 + suffix_md5
        message = receving_data[16:-16]
        cur_md5 = self.calc_md5(message)
        print cur_md5, message_md5
        if message_md5 == cur_md5:
            return True, message
        else:
            return False, message
        pass

    def validating_content_token(self, content):
        receive_content = content['Business']['Content']
        if not isinstance(receive_content, str):
            receive_content = str(receive_content)
        receive_md5 = content['Info_Status']['Security_Token']
        receive_time = content['Info_Status']['Time']
        cur_md5 = self.calc_md5(receive_content + receive_time + self.key)
        if cur_md5 == receive_md5:
            return True
        else:
            return False
        pass

    def packaging_message(self, Message=None, Error=None, Info_Status=None):
        # ----Pack send message
        # Init Communication_Packet
        cm_packet = Communication_Packet()

        # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True):
        cm_packet.set_content_Business(targv=Message)

        # def set_content_Error(self,error_flag = False,error_type = None,error_info = None):
        cm_packet.set_content_Error(targv=Error)

        now_time = str(datetime.datetime.now())  # get current time
        # Business+time+key calculate token,calculate token
        security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key)
        # def set_content_Info_Status(self,info_type,security_token,time,is_end):
        # Need to replace the security_token
        Info_Status = list(Info_Status)  # 转化元组到列表
        Info_Status[1] = security_token
        Info_Status[2] = now_time
        Info_Status = tuple(Info_Status)  # 重新转化列表到元组作为参数

        cm_packet.set_content_Info_Status(targv=Info_Status)
        try:
            send_data = cm_packet.content
        except Exception, e:
            raise e
        else:
            # we Encryption data
            self.logger.debug(type(send_data))
            encryption_send_data = self.encryption.encrypt(str(send_data))
            # caculate md5
            encrypt_send_md5 = self.calc_md5(encryption_send_data)
            complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF
            return complete_send_data

    def unpackaging_message(self, unpacking_str):
        # pdb.set_trace()
        if not isinstance(unpacking_str, str):
            raise exceptions.ValueError
        else:
            unpacking_str = unpacking_str.strip(self.EOF)
            flag, message = self.validating_message_token(unpacking_str)
            if flag:
                decrypt_str = self.encryption.decrypt(message)
                try:
                    message_dict = eval(decrypt_str)
                except Exception, e:
                    self.logger.error('Eval decrypt_str Error!')
                    raise e
                else:
                    if self.validating_content_token(message_dict):
                        return message_dict
                    else:
                        self.logger.error('Message is tampered!')
                        return None
                pass
            else:
                self.logger.error('Message is tampered!')
                return None

    def init_detection_nums(self):
        # get detect_point_nums from server db
        # pdb.set_trace()
        query_sql = "select distinct(sync_point_no) from sync_control"
        BD_ERROR = False
        with self.lock:
            point_nums = self.db.fechdb(query_sql)
            BD_ERROR = self.db.Error
        if BD_ERROR:
            self.logger.error('-----get detect_point_nums error-----')

        if point_nums:
            nums_list = [p[0] for p in point_nums]
            return nums_list
        else:
            return None

    def verification_sync_point_no(self, detection_num):
        detect_point_nums = self.init_detection_nums()
        if not detect_point_nums:
            self.logger.error('-----db config error!-----')
            raise
        if detection_num in detect_point_nums:
            return True
        else:
            return False

    def read_sync_contol_configure(self, table_name, sync_type, sync_point_no, sync_user='server'):
        # Read the control configuration from loacal db,if have not,we sync it from server,then read it again
        # pdb.set_trace()
        qeury_sql = "select * from sync_control where sync_table = '%s' and sync_type = '%s' and sync_user = '%s' and sync_point_no = '%s';" % (
        table_name, sync_type, sync_user, sync_point_no)
        DB_ERROR = False

        # set table name
        sync_table_name = 'sync_control'

        # get `sync_control` table fields
        table_field = self.get_table_filed(sync_table_name, self.lock, self.db, self.dbname)
        if not table_field:
            self.logger.error('----------' % table_name)
            return None

        with self.lock:
            control_configure = self.db.fechdb(qeury_sql)
            DB_ERROR = self.db.Error
        if DB_ERROR:
            self.logger.error('-----Get control configure Error!-----')
            return None
        if control_configure:
            # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download)
            # format the configure to a list
            control_configure_list = []
            for iter_conf in control_configure:
                control_configure_item = dict.fromkeys(table_field)
                lenth = len(table_field)
                # set value for everyone key
                for i in range(lenth):
                    val = iter_conf[i]
                    if isinstance(val, unicode):
                        val = val.encode('utf-8')
                    control_configure_item[table_field[i]] = val
                control_configure_list.append(control_configure_item)
            return control_configure_list

    def parsing_config(self, config):
        # parsing the config to get the sql
        # may modify the logic of the code
        # pdb.set_trace()
        p_conf = dict()
        table_name = config['sync_table']  # Get the download table name
        p_conf['table_name'] = table_name
        table_field = config['sync_field']  # Get sync table field!
        p_conf['sync_type'] = config['sync_type']
        p_conf['sync_range'] = config['sync_range']
        p_conf['sync_range_value'] = config['sync_range_value']
        p_conf['sync_is_update_time'] = config['sync_is_update_time']

        # if table_field is null,we need sync all the field!
        if not table_field:
            table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
            if not table_field:
                self.logger.error(
                    '-----Terminate this task,becase of getting the %s table fileds fialed!-----' % table_name)
                return
        p_conf['table_field'] = table_field
        # Get this operation's type
        try:
            sql_operations = eval(config['sync_operation_type'])
        except Exception, e:
            self.logger.error('-----get sync_operation_type error!-----')
            return

        upside_operate = sql_operations['upside']  # if have, this download operation need carry db info to the server!
        p_conf['upside_operate'] = upside_operate
        downside_operate = sql_operations['downside']  # how to handle the downloaded db info!
        p_conf['downside_operate'] = downside_operate
        update_state_operate = sql_operations['update_state']
        p_conf['update_state_operate'] = update_state_operate

        # Get the sync sql of the corresponding operation
        try:
            sqls = eval(config['sync_sql'])
        except Exception, e:
            self.logger.error('-----get sync_sql error!-----')
            raise

        upside_sqls = sqls['upside']  # a tuple or None
        p_conf['upside_sqls'] = upside_sqls

        downside_sqls = sqls['downside']  # a tuple or None
        p_conf['downside_sqls'] = downside_sqls

        update_state_sqls = sqls['update_state']  # a tuple or None
        p_conf['update_state_sqls'] = update_state_sqls

        # Get the sync patch field of the corresponding operation
        try:
            if config['sync_patch_field']:
                patch_fields = eval(config['sync_patch_field'])
            else:
                pass
        except Exception, e:
            self.logger.error('-----get sync_field error!-----')
            raise

        upside_fields = patch_fields['upside']  # a tuple or None
        p_conf['upside_fields'] = upside_fields

        downside_fields = patch_fields['downside']  # a tuple or None
        p_conf['downside_fields'] = downside_fields

        update_state_fields = patch_fields['update_state']  # a tuple or None
        p_conf['update_state_fields'] = update_state_fields

        # Get the sync_field_value of the corresponding operation
        try:
            if config['sync_patch_field_value']:
                patch_field_values = eval(config['sync_patch_field_value'])
            else:
                pass
        except Exception, e:
            self.logger.error('-----get sync_field_value error!-----')
            return

        upside_patch_field_values = patch_field_values['upside']  # a tuple or None
        p_conf['upside_patch_field_values'] = upside_patch_field_values

        downside_patch_field_values = patch_field_values['downside']  # a tuple or None
        p_conf['downside_patch_field_values'] = downside_patch_field_values

        update_state_patch_field_values = patch_field_values['update_state']  # a tuple or None
        p_conf['update_state_patch_field_values'] = update_state_patch_field_values

        is_carry_state = config['sync_is_carry_state']
        p_conf['is_carry_state'] = is_carry_state

        is_update_state = config['sync_is_update_state']
        p_conf['is_update_state'] = is_update_state

        is_use_state_carry_data = config['sync_is_use_state_carry_data']
        p_conf['is_use_state_carry_data'] = is_use_state_carry_data

        return p_conf

    def __proxy(self, FileNo):
        # 启动线程处理写事件
        newthread = threading.Thread(target=self.__handler_write_event, args=(FileNo,))
        # newthread.daemon = True
        newthread.start()

    def __delevnet(self, FileNo):
        '''
        注销不再关注的事件以及删除相关的资源
        '''
        self.logger.info('Start to unregister and close %s socket... ' % FileNo)
        self.epoll.unregister(FileNo)
        self.connections[FileNo].close()
        del self.connections[FileNo]
        del self.requests[FileNo]
        del self.addresses[FileNo]
        del self.errorInfo[FileNo]
        self.logger.info('unregistered and closed the %s socket! ' % FileNo)

    # def __read_from_socket(self,FileNo):
    # '''
    # Read data from socket
    # '''
    # if self.requests[FileNo]:
    # return False
    # else:
    # try:
    # while True:
    # tmpdata = self.connections[FileNo].recv(4096)
    # if not tmpdata:
    # return True
    # self.requests[FileNo] += tmpdata
    # self.logger.debug(len(tmpdata))
    # #print tmpdata
    # except socket.error:
    # return True
    # except Exception,e:
    # raise e
    def __read_from_socket(self, FileNo):
        '''
        Read data from socket
        '''
        try:
            while True:
                tmpdata = self.connections[FileNo].recv(4096)
                # print 'tmpdata: %s' % tmpdata
                # 因为python没有EPOLLRDHUP,而客户端主动关闭或者没有发送数据前ctr+c
                # 服务器触发的是EPOLLIN事件,而从socket里面读取到的数据为空...没有找到其他解决方案!
                if not tmpdata:
                    break
                self.requests[FileNo] += tmpdata
                # self.logger.debug(len(tmpdata))
        except socket.error:
            pass
        except Exception, e:
            raise e

    # error_flag = False,error_type = None,error_info = None
    def __deal_business(self, FileNo):
        # 根据接受到的数据处理客户端业务
        # pdb.set_trace()
        try:
            message = self.unpackaging_message(self.requests[FileNo])
            # we need reset the requests info
            self.requests[FileNo] = ''
        # self.logger.debug(message)
        except:
            # 需要设置错误标志并却设置错误信息
            self.logger.error('unpackaging_message Error!')
            self.errorInfo[FileNo] = (Error_Info_Flags.Receive_Data_Error, 'Server recieved data Error!')
            return
        else:
            if message:
                business = message['Business']
                client_id = message['userid']
                error_info = message['Error']
                info_states = message['Info_Status']
                verification_result = self.verification_sync_point_no(client_id)
                if verification_result:
                    # Here we handle the business
                    # 1、get config from the db
                    # read_sync_contol_configure(self,table_name,sync_type,sync_point_no,sync_user):
                    try:
                        table_name = business['Table_Name']
                        sync_type = business['Type']
                        is_sync_flag = business['Is_Syncdb']
                    except:
                        self.errorInfo[FileNo] = (
                        Error_Info_Flags.Receive_Data_Error, 'Business information is incomplete!')
                        return
                    if sync_type == Communication_Packet_Flags.DOLOAD_DB:
                        s_type = 'download'
                    else:
                        s_type = 'upload'
                    b_config = self.read_sync_contol_configure(table_name, s_type, client_id)
                    # pdb.set_trace()
                    if b_config:
                        p_config = [self.parsing_config(conf) for conf in b_config]
                        self.logger.debug(p_config)
                    else:
                        pass
                    if b_config:
                        self.real_business_processing_functions(FileNo, p_config, business, info_states, error_info)
                    else:
                        # set error info!
                        self.errorInfo[FileNo] = (
                        Error_Info_Flags.Server_config_Error, 'Server config is None, give up this task!')

                else:
                    # 用户信息认证失败
                    self.logger.error('-----User authentication failed! userid: %s-----' % client_id)
                    self.errorInfo[FileNo] = (Error_Info_Flags.User_Certification_Error, 'User authentication failed!')
            else:
                # if no message,it means information authenticationfailed!
                self.logger.error('-----Clinet\'s Information authentication failed!-----')
                self.errorInfo[FileNo] = (
                Error_Info_Flags.Info_Certification_Error, 'Information authentication failed!')

    def calculate_time(self, time_type, time_value):
        # maybe is str,we need to convert it to int type
        time_value = int(time_value)

        # get current time as the end time
        cur_time = datetime.datetime.now()
        hours = 0
        if time_type == 'hour':
            hours = time_value * 24
        elif time_type == 'day':
            hours = time_value * 24
        elif time_type == 'week':
            hours = time_value * 24 * 7
        elif time_type == 'month':
            hours = time_value * 24 * 30
        else:
            self.logger.error('-----time_type Error!-----')
            return None
        # caculate the start time
        start_time = cur_time - datetime.timedelta(hours=hours)
        return (start_time, cur_time)

    # handle the bussiness from the client
    def real_business_processing_functions(self, FileNo, business_config, business, info_states, error_info):
        # pdb.set_trace()
        # according to the config we handle the business
        business_config = business_config[0]
        if info_states['Info_Type'] == Communication_Packet_Flags.REQEST:
            # get bussiness type
            request_bussiness_type = business['Type']
            if request_bussiness_type == Communication_Packet_Flags.UPLOAD_DB:
                request_bussiness_type = 'upload'
            elif request_bussiness_type == Communication_Packet_Flags.DOLOAD_DB:
                request_bussiness_type = 'download'
            else:
                self.errorInfo[FileNo] = (
                Error_Info_Flags.Client_Data_Pack_Error, 'Request business type error %s' % request_bussiness_type)
                return

            loc_config_sync_type = business_config['sync_type']
            if request_bussiness_type == loc_config_sync_type:

                is_carry_state = business_config['is_carry_state']
                is_use_state_carry_data = business_config['is_use_state_carry_data']
                is_update_state = business_config['is_update_state']
                # handle the download request
                if request_bussiness_type == 'download':
                    # parsing the loacal config
                    up_sql_list = []
                    upside_operates = business_config['upside_operate'].split('|')
                    upside_sqls = business_config['upside_sqls']
                    upside_fields = business_config['upside_fields']
                    upside_patch_field_values = business_config['upside_patch_field_values']
                    sync_range = business_config['sync_range']
                    sync_range_value = business_config['sync_range_value']

                    lenth = len(upside_sqls)
                    for i in range(lenth):
                        sql_part = upside_sqls[i]

                        # if sync_range is not None,we will ignore the other
                        if sync_range:
                            if sync_range == 'period':
                                t_type, t_value = sync_range_value.split(':')
                                s_time, e_time = self.calculate_time(t_type, t_value)
                                qeury_sql = sql_part % (str(s_time), str(e_time))
                            else:
                                qeury_sql = sql_part
                            # add it into the list
                            up_sql_list.append(qeury_sql)
                        else:
                            # we need parsing other configurations
                            if is_use_state_carry_data:
                                try:
                                    # [((u'update_time', u'1970-01-01 00:00:00'),)] limk this
                                    qeury_sql = sql_part % business['Content'][0][0][1]
                                except:
                                    self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Content Error!')
                            else:
                                qeury_sql = sql_part
                            up_sql_list.append(qeury_sql)
                    query_data = []
                    for u_sql in up_sql_list:
                        BD_ERROR = False
                        with self.lock:
                            res = self.db.fechdb(u_sql)
                            BD_ERROR = self.db.Error
                        if BD_ERROR:
                            self.errorInfo[FileNo] = (
                            Error_Info_Flags.Server_DB_Error, 'Server db Error,SQL: %s' % u_sql)
                            break
                        else:
                            query_data.append(res)
                    self.responseInfo[FileNo] = query_data
                # handle the upload request
                elif request_bussiness_type == 'upload':
                    # pdb.set_trace()
                    # parsing the loacal config
                    content = business['Content']
                    try:
                        self.refresh_the_database(business_config, content)
                    except Exception, e:
                        print e
                        self.errorInfo[FileNo] = (Error_Info_Flags.Server_config_Error, 'Server Config Error!')

                else:
                    self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'bussiness type Error!')
            else:
                self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error,
                                          'server config type is different from client request business type! Error!')
        else:
            self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Communication_Packet_Flags Error!')

    # update the db
    def refresh_the_database(self, handle_config, db_content):
        '''
        refresh the database,maybe insert、update、delete...
        '''
        # parsing the handle config
        table_name = handle_config['table_name']
        table_field = handle_config['table_field']
        downside_operate = handle_config['downside_operate']
        update_state_operate = handle_config['update_state_operate']
        downside_sqls = handle_config['downside_sqls']
        update_state_sqls = handle_config['update_state_sqls']
        downside_fields = handle_config['downside_fields']
        update_state_fields = handle_config['update_state_fields']
        downside_patch_field_values = handle_config['downside_patch_field_values']
        update_state_patch_field_values = handle_config['update_state_patch_field_values']
        is_update_time = handle_config['sync_is_update_time']
        # pdb.set_trace()

        try:
            table_field = eval(table_field)
            if not table_field:
                table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
            first_field = table_field[0]
        except Exception, e:
            self.logger.error('-----eval table_field error,config is error!-----')
            raise e
        if first_field == 'id':
            is_id = True
        else:
            is_id = False

        download_oprations = downside_operate.split('|')
        if 'file' in download_oprations:
            filename = self.createNewBlackListPath()
            handle_flag = self.handle_file_func(db_content, filename)
            return handle_flag
        # table_field = eval(table_field)
        try:
            is_update_time = int(is_update_time)
        except:
            self.logger.error('-----is_update_time config value error!-----')
            raise
        for db_item in db_content:
            if is_update_time:
                time_index = table_field.index('update_time')
                update_time = (str(datetime.datetime.today()).split('.')[0],)
                db_item = db_item[:time_index] + update_time + db_item[time_index + 1:]
            if is_id:
                rowdata = db_item[1:]
            else:
                rowdata = db_item
            # self.logger.debug(rowdata)
            # print dict(zip(self.phishing_log_fields,rowdata))
            lenth = len(download_oprations)
            for oper in download_oprations:
                # here we get all the patched field value
                # '((fixed,true),(carry,None),(tansfer,None))',
                myindex = download_oprations.index(oper)
                fields_value = []
                # pdb.set_trace()
                for i in range(len(downside_patch_field_values[myindex])):
                    val = downside_patch_field_values[myindex][i]
                    if val[0] == 'fixed':
                        pass
                    elif val[0] == 'carry':
                        pass
                    elif val[0] == 'transfer':
                        field_name = downside_fields[myindex][i]
                        v_index = table_field.index(field_name)
                        tf_value = db_item[v_index]
                        fields_value.append(tf_value)
                        pass
                    else:
                        self.logger.error('-----server downside_patch_field_values Error! valuse:    %s------' % str(
                            downside_patch_field_values))
                # pdb.set_trace()
                if fields_value:
                    d_sql, f_val = self.pre_handle_None_value(downside_sqls[myindex],
                                                              self.format_field_value(fields_value))
                    db_sql = self.format_sql(d_sql, f_val)
                else:
                    db_sql = downside_sqls[myindex]

                # pdb.set_trace()
                BD_ERROR = False
                with self.lock:
                    if oper == 'insert':
                        self.db.insertdb(db_sql)
                        BD_ERROR = self.db.Error
                    if oper == 'update':
                        self.db.updatedb(db_sql)
                        BD_ERROR = self.db.Error
                    if oper == 'delete':
                        self.db.deldb(db_sql)
                        BD_ERROR = self.db.Error
                if not BD_ERROR:
                    break
                else:
                    continue
        else:
            return True

    def format_tuple(self, tup):
        '''
        It is None if field in DB is NULL when we get the data from db use mysqldb!
        Format the None to NuLL for inserting data to DB
        '''
        vluelist = ['NULL' if t is None else t for t in tup]
        padlist = ['%s' if t is None else '\'%s\'' for t in tup]
        padstr = ''
        for pl in padlist:
            padstr += pl
            padstr += ','
        else:
            padstr = padstr[:-1]
        return padstr % tuple(vluelist)

    def format_sql(self, patch_sql, patch_field_value):
        if isinstance(patch_sql, str) and isinstance(patch_field_value, tuple):
            try:
                res_sql = patch_sql % patch_field_value
            except:
                res_sql = None
            return res_sql
        else:
            self.logger.error('-----formate_sql args type error-----')
            raise exceptions.TypeError

    def format_field_value(self, field_value):
        # we neeed hanle the ' or " in the mysql statement
        res_list = list()
        for val in field_value:
            if isinstance(val, unicode):
                val = val.encode('utf-8')
            if isinstance(val, str):
                f_val = val.replace('\'', '\\\'').replace('\"', '\\\"')
            else:
                f_val = val
            res_list.append(f_val)
        return tuple(res_list)

    def get_all_sub_str_index(self, index_str, sub_str, none_indexs):
        # print index_str
        index_list = []
        start_index = 0
        cnt = 0
        while True:
            try:
                tmp_index = index_str.index(sub_str, start_index)
            except:
                break
            else:
                if cnt in none_indexs:
                    index_list.append(tmp_index)
                start_index = tmp_index + len(sub_str)
                cnt += 1
        return tuple(index_list)

    def pre_handle_None_value(self, patch_sql, field_values):

        # get all the None value index
        None_indexs = []
        for i in range(len(field_values)):
            if field_values[i] is None:
                None_indexs.append(i)

        if None_indexs:
            # get '%s' indexs
            s_indexs = self.get_all_sub_str_index(patch_sql, "'%s'", None_indexs)

            str_list = list(patch_sql)
            # pdb.set_trace()
            subtraction_index = 0
            for ix in s_indexs:
                print subtraction_index
                str_list.pop(ix - subtraction_index)
                # print str_list[ix-subtraction_index]
                str_list.pop(ix - subtraction_index + 2)
                subtraction_index += 2
            replace_str = ''.join(str_list)
            # pdb.set_trace()
            # print replace_str
            # pdb.set_trace()
            res_field_values = ['NULL' if f_val is None else f_val for f_val in field_values]

            return replace_str, tuple(res_field_values)
        else:
            return patch_sql, field_values

    def __handler_read_event(self, FileNo):
        self.logger.info('Start handle the recieved data...')
        # 对接受到数据做业务处理
        try:
            self.__deal_business(FileNo)
        except Exception, e:
            self.logger.error('__deal_business Exception:    %s' % e)
            # self.logger.debug(datetime.datetime.now())
            try:
                self.epoll.modify(FileNo, select.EPOLLOUT | select.EPOLLET | select.EPOLLONESHOT)
            except:
                pass
            self.logger.error('Deal_business ERRor')
        else:
            self.modify_revent_to_wevent(FileNo)
            self.logger.info('Handle the recieved data End!')

    # content = dict(sync_point=self.detect_piont_name,sync_point_no = self.detect_piont_serial_number)
    # message_info = (Communication_Packet_Flags.DOLOAD_DB,str(content),True,table_name,table_field)
    # error_info = (False,None,None)
    # message_status = (Communication_Packet_Flags.RESPONSE,None,str(datetime.datetime.now()),True)

    def __handler_write_event(self, FileNo):

        # if errorInfo is not null,we send Error to Client else handing write business
        Error_Info = None
        try:
            Error_Info = self.errorInfo[FileNo]
            # reset error info to None
            self.errorInfo[FileNo] = ''
        except:
            # 说明socket已从列表注销和直接退出程序
            self.logger.info('This socket is removed from error info list!')
            return
        error_info = (False, None, None)
        if Error_Info:
            print Error_Info  # using debug
            error_info = (True, Error_Info[0], Error_Info[1])

        response = self.responseInfo[FileNo]
        # need reset the response info
        self.responseInfo[FileNo] = ''

        res_info = (None, response, None, None, False, None)

        info_states = (Communication_Packet_Flags.RESPONSE, None, None, True)

        message = self.packaging_message(res_info, error_info, info_states)
        self.logger.debug(message)
        self.send_message(FileNo, message, True)  # send the message to client

        self.modify_wevent_to_revent(FileNo)  # modify the event

    def modify_wevent_to_revent(self, FileNo):
        '''
        If we trigger the read envet,we use this function
        '''
        try:
            # We need modify event to read event!
            self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLIN | select.EPOLLONESHOT)
        except:
            pass

    def modify_revent_to_wevent(self, FileNo):
        '''
        If we trigger the write envet,we use this function
        '''
        try:
            self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLOUT | select.EPOLLONESHOT)
        except:
            pass

    def send_message(self, FileNo, message, blocking=True):
        # if message is big,use noblocking it will occur error!
        # so, we maybe set it to blocking
        if blocking:
            self.connections[FileNo].setblocking(True)
        if FileNo not in self.connections:
            self.logger.debug('This socket not in the connections list!')
            return
        try:
            self.connections[FileNo].sendall(message)
        except Exception, e:
            pass
        # last we need to set it to False! we use the noblocking module
        if blocking:
            self.connections[FileNo].setblocking(False)

    def start_server(self):
        try:

            self.__init_server()  # 初始化服务器
        except:
            # 初始化服务器错误
            self.logger.critical('Init server Error...')
            raise

        while True:
            # join the thread
            self.__jionthreads()

            # Wait for at least one of the sockets to be ready for processing
            self.logger.info('waiting for the next event')
            events = self.epoll.poll()

            for fileno, event in events:
                # Handle inputs
                if fileno == self.server.fileno():
                    try:
                        while True:
                            connection, address = self.server.accept()
                            connection.setblocking(0)  # 设置连接为非阻塞模式
                            # Here we can not use select.EPOLLONESHOT flag.This flag
                            self.epoll.register(connection.fileno(),
                                                select.EPOLLIN | select.EPOLLET)  # 把新来的连接同样设置为边缘出发模式
                            self.connections[connection.fileno()] = connection  # 记录连接
                            self.requests[connection.fileno()] = ''  # 记录业务请求
                            self.addresses[connection.fileno()] = address  # 记录连接地址
                            self.errorInfo[connection.fileno()] = ''
                            self.responseInfo[connection.fileno()] = ''
                            # 设置错误信息如果为空串则是无错误信息
                            self.logger.info('========================================')
                            self.logger.info('Client %s:%s connected server' % (address))
                    except socket.error:
                        pass

                # elif event & (select.EPOLLIN | select.EPOLLONESHOT):
                elif event & select.EPOLLIN:

                    # Read data from socket untill data is recieved over!
                    self.logger.debug('EVENT EPOLLIN: %s' % hex(event))
                    # pdb.set_trace()
                    try:
                        r_flag = self.__read_from_socket(fileno)
                    except socket.error:
                        pass
                    except Exception, e:
                        # if we catch other Exception, it is to say that we recieved data from client Error!
                        # We need send error data to client!
                        self.logger.warning('Catch other exception when recieve data!')

                        self.errorInfo[fileno] = (
                        Error_Info_Flags.Receive_Data_Error, '-----Server recieved data Error!-----')
                        self.modify_revent_to_wevent(fileno)
                    else:
                        # if it has no exception when eval the data, we think that client data is recieved over.
                        # #then start a new thread to deal with the client data
                        # if not r_flag:
                        # print '#################################'
                        # pass
                        # else:
                        if self.requests[fileno]:
                            # Start a new thread to disposal the client requests
                            # self.logger.debug(self.requests[fileno])
                            if self.requests[fileno].endswith(self.EOF):
                                newthread = threading.Thread(target=self.__handler_read_event, args=(fileno,))
                                newthread.daemon = True
                                newthread.start()
                                print 'start %s' % newthread.name
                                # print 'print start new thread'
                        else:
                            # 没有从客户端读取到数据,说明客户端已经关闭,主动挂断
                            # self.logger.info("closing    %s    %s    (HUP)" % self.addresses[fileno])
                            self.__delevnet(fileno)

                # elif event & (select.EPOLLOUT | select.EPOLLONESHOT):
                elif event & select.EPOLLOUT:
                    self.logger.debug('EVENT EPOLLOUT: %s' % bin(event))
                    # Write event happened,we use proxy to deal
                    # print 'Current file descripter: %d' % fileno
                    self.__proxy(
                        fileno)  # We neet a proxy using a function,but not a threadiing! If threading it has bugs(multi trigger event--I think)

                elif event & select.EPOLLHUP:
                    self.logger.debug('EVENT EPOLLHUP: %s' % bin(event))
                    # Client hung up, del event!
                    self.logger.info("closing    %s    %s    (HUP)" % self.addresses[fileno])
                    self.__delevnet(fileno)

                elif event & select.EPOLLERR:
                    # self.logger.debug('EVENT: %s' % event)
                    self.logger.info("    exception on %s" % connections[fileno].getpeername())
                    self.__delevnet(fileno)

                else:
                    # self.logger.debug('EVENT: %s' % bin(event))
                    # Other event,do not handle
                    pass


if __name__ == '__main__':
    # pdb.set_trace()
    myserver = Server()
    myserver.start_server()
server

相关文章:

  • 2021-12-14
  • 2022-12-23
  • 2022-12-23
  • 2021-06-30
  • 2021-11-19
  • 2021-10-04
  • 2021-04-13
猜你喜欢
  • 2021-12-14
  • 2021-06-11
  • 2021-12-18
  • 2021-11-18
  • 2021-07-04
  • 2021-09-09
相关资源
相似解决方案