IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程
通俗理解(摘自网上一大神)
三个函数
1、select
进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。
当我们调用select()时:
1 上下文切换转换为内核态
2 将fd从用户空间复制到内核空间
3 内核遍历所有fd,查看其对应事件是否发生
4 如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历
5 返回遍历后的fd
6 将fd从内核空间复制到用户空间
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 参数: 可接受四个参数(前三个必须) rlist: wait until ready for reading wlist: wait until ready for writing xlist: wait for an “exceptional condition” timeout: 超时时间 返回值:三个列表 select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表 1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中 2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中 3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中 4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化 当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
在服务端我们可以看到,我们需要不停的调用select, 这就意味着:
1 当文件描述符过多时,文件描述符在用户空间与内核空间进行copy会很费时
2 当文件描述符过多时,内核对文件描述符的遍历也很浪费时间
3 select最大仅仅支持1024个文件描述符
参考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html
2、poll
参考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html
3、epoll
参考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html
epoll是select和poll改进后的结果,相比下epoll具有以下优点:
1、支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)
select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024
2、I/O效率不会随着FD数目的增加而线性下降
epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时,会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会。在这点上,epoll实现了一个伪AIO
3、使用mmap加速内核与用户空间的消息传递
epoll会在epoll_ctl时把指定的fd遍历一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd
无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap使用同一块内存实现。
4、epoll的API更加简单
用来克服select/poll缺点的方法不只有epoll,epoll只是一种Linux的实现方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用难度依次递增。但epoll更加简单。
epoll详解(python中)
Python中的select模块专注于I/O多路复用,提供了select poll epoll三个方法(其中后两个在Linux中可用,windows仅支持select),另外也提供了kqueue方法(freeBSD系统)
select.epoll(sizehint=-1, flags=0) 创建epoll对象
epoll.close()
Close the control file descriptor of the epoll object.关闭epoll对象的文件描述符
epoll.closed
True if the epoll object is closed.检测epoll对象是否关闭
epoll.fileno()
Return the file descriptor number of the control fd.返回epoll对象的文件描述符
epoll.fromfd(fd)
Create an epoll object from a given file descriptor.根据指定的fd创建epoll对象
epoll.register(fd[, eventmask])
Register a fd descriptor with the epoll object.向epoll对象中注册fd和对应的事件
epoll.modify(fd, eventmask)
Modify a registered file descriptor.修改fd的事件
epoll.unregister(fd)
Remove a registered file descriptor from the epoll object.取消注册
epoll.poll(timeout=-1, maxevents=-1)
Wait for events. timeout in seconds (float)阻塞,直到注册的fd事件发生,会返回一个dict,格式为:{(fd1,event1),(fd2,event2),……(fdn,eventn)}
事件:
EPOLLERR = 8 ----发生错误
EPOLLET = 2147483648 ----默认为水平触发,设置该事件后则边缘触发
EPOLLHUP = 16 ----挂起状态
EPOLLIN = 1 ----可读
EPOLLMSG = 1024 ----忽略
EPOLLONESHOT = 1073741824 ----一次性行为。在退出一个事件后,FD内部禁用
EPOLLOUT = 4 ----可写
EPOLLPRI = 2 ----紧急可读
EPOLLRDBAND = 128 ----读取优先
EPOLLRDNORM = 64 ----相当于epollin
EPOLLWRBAND = 512 ----写入优先
EPOLLWRNORM = 256 ----相当于epollout
水平触发和边缘触发:
Level_triggered(水平触发,有时也称条件触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠
Edge_triggered(边缘触发,有时也称状态触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!缺点:某些条件下不可靠
#!/usr/bin/python # coding:utf-8 import select,socket import time EOL1 = b'\n\n' EOL2 = b'\n\r\n' response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n' response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n' response += b'Hello, world!' sk = socket.socket() sk.bind(('192.168.110.100',8080)) sk.listen(5) sk.setblocking(0) #设置非阻塞模式 epoll = select.epoll() #建立一个epoll对象 epoll.register(sk.fileno(),select.EPOLLIN) #监听sk文件描述符的读事件(连接过来,产生读事件) try: connections = {}; requests = {}; responses = {} while True: events = epoll.poll() #关注是否有关心的事发生 for fileno,event in events: # 返回的events是一个(fileno, event code)tuple列表. fileno是文件描述符, 是一个整型数. if fileno == sk.fileno(): #如果是服务器socket事件(即有新连接),建立一个新 连接 connection, address = sk.accept() #建立的新连接 connection.setblocking(0) #设置socket为非阻塞模式. epoll.register(connection.fileno(), select.EPOLLIN) # 注册socket的read(EPOLLIN)事件 connections[connection.fileno()] = connection # 保存文件描述符 requests[connection.fileno()] = b'' #发送过来的内容 responses[connection.fileno()] = response # 要发送的内容 elif event & select.EPOLLIN: #如果读事件发生 requests[fileno] += connections[fileno].recv(1024)# 从客户端读取信息 if EOL1 in requests[fileno] or EOL2 in requests[fileno]: #表示信息接收完毕,结束标志 epoll.modify(fileno, select.EPOLLOUT)#一旦完整的http请求接收到,取消注册读取事件,注册写入事件(EPOLLOUT), 写入事件在能够发送数据回客户端的时候产生 print('-'*40 + '\n' + requests[fileno].decode()[:-2]) elif event & select.EPOLLOUT: #如果写入事件发生在一个客户端socket上面, 我们就可以发送新数据到客户端了. byteswritten = connections[fileno].send(responses[fileno]) #发送数据到客户端,并返回发送的字节个数 responses[fileno] = responses[fileno][byteswritten:] #对字符串进行切片操作,如果完全切,表面发送完毕 if len(responses[fileno]) == 0: #表明数据发送完毕 epoll.modify(fileno, 0) #一旦所有的返回数据都发送完, 取消监听读取和写入事件. connections[fileno].shutdown(socket.SHUT_RDWR) elif event & select.EPOLLHUP: #表示客户端断开连接 epoll.unregister(fileno) #取消注册 connections[fileno].close() #断开连接. del connections[fileno] #销毁对象 finally: epoll.unregister(sk.fileno()) epoll.close() serversocket.close()
#!/usr/bin/python # coding:utf-8 import socket obj = socket.socket() obj.connect(('192.168.110.100',8080)) obj.sendall('hellob\n\r\n') print obj.recv(1024) obj.close()
实战代码:
# /usr/bin/python # coding:utf-8 import select import socket import sys import Queue import time import threading import logging import datetime import re, os import hashlib sys.path.append('../') import multiprocessing from SQLdb import SQLdb from mylog import MyLog as Log from communication_packet import Communication_Packet, Communication_Packet_Flags, Error_Info_Flags from encryption import PrpCrypt import pdb ''' Constant Meaning EPOLLIN Available for read EPOLLOUT Available for write EPOLLPRI Urgent data for read EPOLLERR Error condition happened on the assoc. fd EPOLLHUP Hang up happened on the assoc. fd EPOLLET Set Edge Trigger behavior, the default is Level Trigger behavior EPOLLONESHOT Set one-shot behavior. After one event is pulled out, the fd is internally disabled EPOLLRDNORM Equivalent to EPOLLIN EPOLLRDBAND Priority data band can be read. EPOLLWRNORM Equivalent to EPOLLOUT EPOLLWRBAND Priority data may be written. EPOLLMSG Ignored. ''' class Server(object): def __init__(self, server_IP=None, server_port=None): # def __init__(self,server_address = ('112.33.9.154',11366)): ''' 初始化服务器一些全局数据 ''' # pdb.set_trace() # 使用默认模式:debug模式 self.log = Log() self.log.openConsole() # 打开控制端输出 self.logger = self.log.getLog() self.dbname = 'sync_test' # Defaut, we use local host IP and port:11366 if server_IP is None or server_port is None: if server_IP is None: try: self.server_IP = self.getlocalIP() self.logger.info('Current server_IP: %s' % self.server_IP) except: self.logger.critical('Get server IP Error!') raise if server_port is None: self.server_port = 11366 else: self.server_IP = server_IP self.server_port = server_port self.server_address = (self.server_IP, self.server_port) # 设置server地址 self.ListenNum = 100 # 设置最大监控soket连接数 self.connections = {} # 记录当前连接 self.requests = {} # 记录当前连接的请求数据 self.addresses = {} # 记录客户端地址 self.errorInfo = {} # 记录错误信息,如果出错则把错误信息返回给客户端 self.responseInfo = {} self.readthreadRecord = {} self.lock = threading.Lock() # 构造线程锁用于数据同步 self.db = SQLdb() # 初始化数据库用于数据库同步 self.setDB('localhost', 'root', '123456', 'sync_test') self.readthreadlock = threading.Lock() self.EOF = '\n\r\n' self.servernum = 'serverxxxxx' self.key = '91keytest' # set communication user id Communication_Packet.set_userid(self.servernum) self.encryption = PrpCrypt() pass def setServerAddree(self, server_ip, server_port): ''' Set server address ''' self.server_address = (server_ip, server_port) # 设置server地址 def setDB(self, host=None, username=None, password=None, dbname=None): self.db = SQLdb(host, username, password, dbname) def getlocalIP(self): ''' 获取第一块网卡i做为绑定IP ''' try: s = os.popen('ifconfig').read() except: raise else: ip = re.findall('inet addr:(?<![\.\d])(?:\d{1,3}\.){3}\d{1,3}(?![\.\d])', s)[0].split(':')[1] return ip def __init_server(self): ''' 初始化server以及epoll监控对象 ''' try: # pdb.set_trace() # Create a TCP/IP socket self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置socket为非阻塞状态 self.server.setblocking(0) # Bind the socket to the port self.logger.info('starting up on %s port %s' % self.server_address) try: self.server.bind(self.server_address) except: echo = os.popen('''lsof -i :11366 |grep "(LISTEN)" | awk '{printf($1)}' ''').read() print '端口%s被%s进程占用!' % (self.server_port, echo) self.logger.error('Bind on %s port %s Error!' % self.server_address) raise # Listen for incoming connections # self.server.listen(self.ListenNum) self.server.listen(1) # Set up the epoll self.epoll = select.epoll() self.epoll.register(self.server.fileno(), select.EPOLLIN | select.EPOLLET) # 为server注册读事件,并设置其为边缘出发模式 except: raise Exception('__init_server Error') def __jionthreads(self): ''' join the threading ''' # self.logger.debug('Current threadtast is %d ' % len(threading.enumerate())) main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue else: t.join(0) # 非阻塞join # self.logger.debug('After joined the threads... %d '% len(threading.enumerate())) def __format_str(self, businessFlag, data, endFlag=True, errorFlag=False, hasnewconf=False, versioninfo=''): ''' 格式化发送数据 ''' formatstr = {'BUSINESS_TYPE': businessFlag, 'DATA': data, 'ENDFLAG': endFlag, 'ERRORFLAG': errorFlag, 'HASNWECONF': hasnewconf, 'VERSIONINFO': versioninfo} return str(formatstr) + self.EOF def get_table_filed(self, table_name, db, lock, db_name): # pdb.set_trace() # from the db get the table field! query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and TABLE_SCHEMA = '%s';" % ( table_name, db_name) with self.lock: detection_version_fields = self.db.fechdb(query_detection_version_field_sql) DB_ERROR = self.db.Error if DB_ERROR: # Record Error and end task DB_ERROR = False self.logger.error('----Get %s fileds Error! End this task-----' % table_name) return else: # query result is Unicode,so we need to encode to utf-8 table_field = [field[0].encode('utf-8') for field in detection_version_fields] return table_field def calc_md5(self, data): return hashlib.md5(data).hexdigest() def validating_message_token(self, receving_data): # print receving_data # pdb.set_trace() # print receving_data print len(receving_data) pre_md5 = receving_data[:16] suffix_md5 = receving_data[-16:] message_md5 = pre_md5 + suffix_md5 message = receving_data[16:-16] cur_md5 = self.calc_md5(message) print cur_md5, message_md5 if message_md5 == cur_md5: return True, message else: return False, message pass def validating_content_token(self, content): receive_content = content['Business']['Content'] if not isinstance(receive_content, str): receive_content = str(receive_content) receive_md5 = content['Info_Status']['Security_Token'] receive_time = content['Info_Status']['Time'] cur_md5 = self.calc_md5(receive_content + receive_time + self.key) if cur_md5 == receive_md5: return True else: return False pass def packaging_message(self, Message=None, Error=None, Info_Status=None): # ----Pack send message # Init Communication_Packet cm_packet = Communication_Packet() # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True): cm_packet.set_content_Business(targv=Message) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None): cm_packet.set_content_Error(targv=Error) now_time = str(datetime.datetime.now()) # get current time # Business+time+key calculate token,calculate token security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key) # def set_content_Info_Status(self,info_type,security_token,time,is_end): # Need to replace the security_token Info_Status = list(Info_Status) # 转化元组到列表 Info_Status[1] = security_token Info_Status[2] = now_time Info_Status = tuple(Info_Status) # 重新转化列表到元组作为参数 cm_packet.set_content_Info_Status(targv=Info_Status) try: send_data = cm_packet.content except Exception, e: raise e else: # we Encryption data self.logger.debug(type(send_data)) encryption_send_data = self.encryption.encrypt(str(send_data)) # caculate md5 encrypt_send_md5 = self.calc_md5(encryption_send_data) complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF return complete_send_data def unpackaging_message(self, unpacking_str): # pdb.set_trace() if not isinstance(unpacking_str, str): raise exceptions.ValueError else: unpacking_str = unpacking_str.strip(self.EOF) flag, message = self.validating_message_token(unpacking_str) if flag: decrypt_str = self.encryption.decrypt(message) try: message_dict = eval(decrypt_str) except Exception, e: self.logger.error('Eval decrypt_str Error!') raise e else: if self.validating_content_token(message_dict): return message_dict else: self.logger.error('Message is tampered!') return None pass else: self.logger.error('Message is tampered!') return None def init_detection_nums(self): # get detect_point_nums from server db # pdb.set_trace() query_sql = "select distinct(sync_point_no) from sync_control" BD_ERROR = False with self.lock: point_nums = self.db.fechdb(query_sql) BD_ERROR = self.db.Error if BD_ERROR: self.logger.error('-----get detect_point_nums error-----') if point_nums: nums_list = [p[0] for p in point_nums] return nums_list else: return None def verification_sync_point_no(self, detection_num): detect_point_nums = self.init_detection_nums() if not detect_point_nums: self.logger.error('-----db config error!-----') raise if detection_num in detect_point_nums: return True else: return False def read_sync_contol_configure(self, table_name, sync_type, sync_point_no, sync_user='server'): # Read the control configuration from loacal db,if have not,we sync it from server,then read it again # pdb.set_trace() qeury_sql = "select * from sync_control where sync_table = '%s' and sync_type = '%s' and sync_user = '%s' and sync_point_no = '%s';" % ( table_name, sync_type, sync_user, sync_point_no) DB_ERROR = False # set table name sync_table_name = 'sync_control' # get `sync_control` table fields table_field = self.get_table_filed(sync_table_name, self.lock, self.db, self.dbname) if not table_field: self.logger.error('----------' % table_name) return None with self.lock: control_configure = self.db.fechdb(qeury_sql) DB_ERROR = self.db.Error if DB_ERROR: self.logger.error('-----Get control configure Error!-----') return None if control_configure: # Get the configure from db! and On the basis of classification of table name and sync type(uploat or download) # format the configure to a list control_configure_list = [] for iter_conf in control_configure: control_configure_item = dict.fromkeys(table_field) lenth = len(table_field) # set value for everyone key for i in range(lenth): val = iter_conf[i] if isinstance(val, unicode): val = val.encode('utf-8') control_configure_item[table_field[i]] = val control_configure_list.append(control_configure_item) return control_configure_list def parsing_config(self, config): # parsing the config to get the sql # may modify the logic of the code # pdb.set_trace() p_conf = dict() table_name = config['sync_table'] # Get the download table name p_conf['table_name'] = table_name table_field = config['sync_field'] # Get sync table field! p_conf['sync_type'] = config['sync_type'] p_conf['sync_range'] = config['sync_range'] p_conf['sync_range_value'] = config['sync_range_value'] p_conf['sync_is_update_time'] = config['sync_is_update_time'] # if table_field is null,we need sync all the field! if not table_field: table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname) if not table_field: self.logger.error( '-----Terminate this task,becase of getting the %s table fileds fialed!-----' % table_name) return p_conf['table_field'] = table_field # Get this operation's type try: sql_operations = eval(config['sync_operation_type']) except Exception, e: self.logger.error('-----get sync_operation_type error!-----') return upside_operate = sql_operations['upside'] # if have, this download operation need carry db info to the server! p_conf['upside_operate'] = upside_operate downside_operate = sql_operations['downside'] # how to handle the downloaded db info! p_conf['downside_operate'] = downside_operate update_state_operate = sql_operations['update_state'] p_conf['update_state_operate'] = update_state_operate # Get the sync sql of the corresponding operation try: sqls = eval(config['sync_sql']) except Exception, e: self.logger.error('-----get sync_sql error!-----') raise upside_sqls = sqls['upside'] # a tuple or None p_conf['upside_sqls'] = upside_sqls downside_sqls = sqls['downside'] # a tuple or None p_conf['downside_sqls'] = downside_sqls update_state_sqls = sqls['update_state'] # a tuple or None p_conf['update_state_sqls'] = update_state_sqls # Get the sync patch field of the corresponding operation try: if config['sync_patch_field']: patch_fields = eval(config['sync_patch_field']) else: pass except Exception, e: self.logger.error('-----get sync_field error!-----') raise upside_fields = patch_fields['upside'] # a tuple or None p_conf['upside_fields'] = upside_fields downside_fields = patch_fields['downside'] # a tuple or None p_conf['downside_fields'] = downside_fields update_state_fields = patch_fields['update_state'] # a tuple or None p_conf['update_state_fields'] = update_state_fields # Get the sync_field_value of the corresponding operation try: if config['sync_patch_field_value']: patch_field_values = eval(config['sync_patch_field_value']) else: pass except Exception, e: self.logger.error('-----get sync_field_value error!-----') return upside_patch_field_values = patch_field_values['upside'] # a tuple or None p_conf['upside_patch_field_values'] = upside_patch_field_values downside_patch_field_values = patch_field_values['downside'] # a tuple or None p_conf['downside_patch_field_values'] = downside_patch_field_values update_state_patch_field_values = patch_field_values['update_state'] # a tuple or None p_conf['update_state_patch_field_values'] = update_state_patch_field_values is_carry_state = config['sync_is_carry_state'] p_conf['is_carry_state'] = is_carry_state is_update_state = config['sync_is_update_state'] p_conf['is_update_state'] = is_update_state is_use_state_carry_data = config['sync_is_use_state_carry_data'] p_conf['is_use_state_carry_data'] = is_use_state_carry_data return p_conf def __proxy(self, FileNo): # 启动线程处理写事件 newthread = threading.Thread(target=self.__handler_write_event, args=(FileNo,)) # newthread.daemon = True newthread.start() def __delevnet(self, FileNo): ''' 注销不再关注的事件以及删除相关的资源 ''' self.logger.info('Start to unregister and close %s socket... ' % FileNo) self.epoll.unregister(FileNo) self.connections[FileNo].close() del self.connections[FileNo] del self.requests[FileNo] del self.addresses[FileNo] del self.errorInfo[FileNo] self.logger.info('unregistered and closed the %s socket! ' % FileNo) # def __read_from_socket(self,FileNo): # ''' # Read data from socket # ''' # if self.requests[FileNo]: # return False # else: # try: # while True: # tmpdata = self.connections[FileNo].recv(4096) # if not tmpdata: # return True # self.requests[FileNo] += tmpdata # self.logger.debug(len(tmpdata)) # #print tmpdata # except socket.error: # return True # except Exception,e: # raise e def __read_from_socket(self, FileNo): ''' Read data from socket ''' try: while True: tmpdata = self.connections[FileNo].recv(4096) # print 'tmpdata: %s' % tmpdata # 因为python没有EPOLLRDHUP,而客户端主动关闭或者没有发送数据前ctr+c # 服务器触发的是EPOLLIN事件,而从socket里面读取到的数据为空...没有找到其他解决方案! if not tmpdata: break self.requests[FileNo] += tmpdata # self.logger.debug(len(tmpdata)) except socket.error: pass except Exception, e: raise e # error_flag = False,error_type = None,error_info = None def __deal_business(self, FileNo): # 根据接受到的数据处理客户端业务 # pdb.set_trace() try: message = self.unpackaging_message(self.requests[FileNo]) # we need reset the requests info self.requests[FileNo] = '' # self.logger.debug(message) except: # 需要设置错误标志并却设置错误信息 self.logger.error('unpackaging_message Error!') self.errorInfo[FileNo] = (Error_Info_Flags.Receive_Data_Error, 'Server recieved data Error!') return else: if message: business = message['Business'] client_id = message['userid'] error_info = message['Error'] info_states = message['Info_Status'] verification_result = self.verification_sync_point_no(client_id) if verification_result: # Here we handle the business # 1、get config from the db # read_sync_contol_configure(self,table_name,sync_type,sync_point_no,sync_user): try: table_name = business['Table_Name'] sync_type = business['Type'] is_sync_flag = business['Is_Syncdb'] except: self.errorInfo[FileNo] = ( Error_Info_Flags.Receive_Data_Error, 'Business information is incomplete!') return if sync_type == Communication_Packet_Flags.DOLOAD_DB: s_type = 'download' else: s_type = 'upload' b_config = self.read_sync_contol_configure(table_name, s_type, client_id) # pdb.set_trace() if b_config: p_config = [self.parsing_config(conf) for conf in b_config] self.logger.debug(p_config) else: pass if b_config: self.real_business_processing_functions(FileNo, p_config, business, info_states, error_info) else: # set error info! self.errorInfo[FileNo] = ( Error_Info_Flags.Server_config_Error, 'Server config is None, give up this task!') else: # 用户信息认证失败 self.logger.error('-----User authentication failed! userid: %s-----' % client_id) self.errorInfo[FileNo] = (Error_Info_Flags.User_Certification_Error, 'User authentication failed!') else: # if no message,it means information authenticationfailed! self.logger.error('-----Clinet\'s Information authentication failed!-----') self.errorInfo[FileNo] = ( Error_Info_Flags.Info_Certification_Error, 'Information authentication failed!') def calculate_time(self, time_type, time_value): # maybe is str,we need to convert it to int type time_value = int(time_value) # get current time as the end time cur_time = datetime.datetime.now() hours = 0 if time_type == 'hour': hours = time_value * 24 elif time_type == 'day': hours = time_value * 24 elif time_type == 'week': hours = time_value * 24 * 7 elif time_type == 'month': hours = time_value * 24 * 30 else: self.logger.error('-----time_type Error!-----') return None # caculate the start time start_time = cur_time - datetime.timedelta(hours=hours) return (start_time, cur_time) # handle the bussiness from the client def real_business_processing_functions(self, FileNo, business_config, business, info_states, error_info): # pdb.set_trace() # according to the config we handle the business business_config = business_config[0] if info_states['Info_Type'] == Communication_Packet_Flags.REQEST: # get bussiness type request_bussiness_type = business['Type'] if request_bussiness_type == Communication_Packet_Flags.UPLOAD_DB: request_bussiness_type = 'upload' elif request_bussiness_type == Communication_Packet_Flags.DOLOAD_DB: request_bussiness_type = 'download' else: self.errorInfo[FileNo] = ( Error_Info_Flags.Client_Data_Pack_Error, 'Request business type error %s' % request_bussiness_type) return loc_config_sync_type = business_config['sync_type'] if request_bussiness_type == loc_config_sync_type: is_carry_state = business_config['is_carry_state'] is_use_state_carry_data = business_config['is_use_state_carry_data'] is_update_state = business_config['is_update_state'] # handle the download request if request_bussiness_type == 'download': # parsing the loacal config up_sql_list = [] upside_operates = business_config['upside_operate'].split('|') upside_sqls = business_config['upside_sqls'] upside_fields = business_config['upside_fields'] upside_patch_field_values = business_config['upside_patch_field_values'] sync_range = business_config['sync_range'] sync_range_value = business_config['sync_range_value'] lenth = len(upside_sqls) for i in range(lenth): sql_part = upside_sqls[i] # if sync_range is not None,we will ignore the other if sync_range: if sync_range == 'period': t_type, t_value = sync_range_value.split(':') s_time, e_time = self.calculate_time(t_type, t_value) qeury_sql = sql_part % (str(s_time), str(e_time)) else: qeury_sql = sql_part # add it into the list up_sql_list.append(qeury_sql) else: # we need parsing other configurations if is_use_state_carry_data: try: # [((u'update_time', u'1970-01-01 00:00:00'),)] limk this qeury_sql = sql_part % business['Content'][0][0][1] except: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Content Error!') else: qeury_sql = sql_part up_sql_list.append(qeury_sql) query_data = [] for u_sql in up_sql_list: BD_ERROR = False with self.lock: res = self.db.fechdb(u_sql) BD_ERROR = self.db.Error if BD_ERROR: self.errorInfo[FileNo] = ( Error_Info_Flags.Server_DB_Error, 'Server db Error,SQL: %s' % u_sql) break else: query_data.append(res) self.responseInfo[FileNo] = query_data # handle the upload request elif request_bussiness_type == 'upload': # pdb.set_trace() # parsing the loacal config content = business['Content'] try: self.refresh_the_database(business_config, content) except Exception, e: print e self.errorInfo[FileNo] = (Error_Info_Flags.Server_config_Error, 'Server Config Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'bussiness type Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'server config type is different from client request business type! Error!') else: self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Communication_Packet_Flags Error!') # update the db def refresh_the_database(self, handle_config, db_content): ''' refresh the database,maybe insert、update、delete... ''' # parsing the handle config table_name = handle_config['table_name'] table_field = handle_config['table_field'] downside_operate = handle_config['downside_operate'] update_state_operate = handle_config['update_state_operate'] downside_sqls = handle_config['downside_sqls'] update_state_sqls = handle_config['update_state_sqls'] downside_fields = handle_config['downside_fields'] update_state_fields = handle_config['update_state_fields'] downside_patch_field_values = handle_config['downside_patch_field_values'] update_state_patch_field_values = handle_config['update_state_patch_field_values'] is_update_time = handle_config['sync_is_update_time'] # pdb.set_trace() try: table_field = eval(table_field) if not table_field: table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname) first_field = table_field[0] except Exception, e: self.logger.error('-----eval table_field error,config is error!-----') raise e if first_field == 'id': is_id = True else: is_id = False download_oprations = downside_operate.split('|') if 'file' in download_oprations: filename = self.createNewBlackListPath() handle_flag = self.handle_file_func(db_content, filename) return handle_flag # table_field = eval(table_field) try: is_update_time = int(is_update_time) except: self.logger.error('-----is_update_time config value error!-----') raise for db_item in db_content: if is_update_time: time_index = table_field.index('update_time') update_time = (str(datetime.datetime.today()).split('.')[0],) db_item = db_item[:time_index] + update_time + db_item[time_index + 1:] if is_id: rowdata = db_item[1:] else: rowdata = db_item # self.logger.debug(rowdata) # print dict(zip(self.phishing_log_fields,rowdata)) lenth = len(download_oprations) for oper in download_oprations: # here we get all the patched field value # '((fixed,true),(carry,None),(tansfer,None))', myindex = download_oprations.index(oper) fields_value = [] # pdb.set_trace() for i in range(len(downside_patch_field_values[myindex])): val = downside_patch_field_values[myindex][i] if val[0] == 'fixed': pass elif val[0] == 'carry': pass elif val[0] == 'transfer': field_name = downside_fields[myindex][i] v_index = table_field.index(field_name) tf_value = db_item[v_index] fields_value.append(tf_value) pass else: self.logger.error('-----server downside_patch_field_values Error! valuse: %s------' % str( downside_patch_field_values)) # pdb.set_trace() if fields_value: d_sql, f_val = self.pre_handle_None_value(downside_sqls[myindex], self.format_field_value(fields_value)) db_sql = self.format_sql(d_sql, f_val) else: db_sql = downside_sqls[myindex] # pdb.set_trace() BD_ERROR = False with self.lock: if oper == 'insert': self.db.insertdb(db_sql) BD_ERROR = self.db.Error if oper == 'update': self.db.updatedb(db_sql) BD_ERROR = self.db.Error if oper == 'delete': self.db.deldb(db_sql) BD_ERROR = self.db.Error if not BD_ERROR: break else: continue else: return True def format_tuple(self, tup): ''' It is None if field in DB is NULL when we get the data from db use mysqldb! Format the None to NuLL for inserting data to DB ''' vluelist = ['NULL' if t is None else t for t in tup] padlist = ['%s' if t is None else '\'%s\'' for t in tup] padstr = '' for pl in padlist: padstr += pl padstr += ',' else: padstr = padstr[:-1] return padstr % tuple(vluelist) def format_sql(self, patch_sql, patch_field_value): if isinstance(patch_sql, str) and isinstance(patch_field_value, tuple): try: res_sql = patch_sql % patch_field_value except: res_sql = None return res_sql else: self.logger.error('-----formate_sql args type error-----') raise exceptions.TypeError def format_field_value(self, field_value): # we neeed hanle the ' or " in the mysql statement res_list = list() for val in field_value: if isinstance(val, unicode): val = val.encode('utf-8') if isinstance(val, str): f_val = val.replace('\'', '\\\'').replace('\"', '\\\"') else: f_val = val res_list.append(f_val) return tuple(res_list) def get_all_sub_str_index(self, index_str, sub_str, none_indexs): # print index_str index_list = [] start_index = 0 cnt = 0 while True: try: tmp_index = index_str.index(sub_str, start_index) except: break else: if cnt in none_indexs: index_list.append(tmp_index) start_index = tmp_index + len(sub_str) cnt += 1 return tuple(index_list) def pre_handle_None_value(self, patch_sql, field_values): # get all the None value index None_indexs = [] for i in range(len(field_values)): if field_values[i] is None: None_indexs.append(i) if None_indexs: # get '%s' indexs s_indexs = self.get_all_sub_str_index(patch_sql, "'%s'", None_indexs) str_list = list(patch_sql) # pdb.set_trace() subtraction_index = 0 for ix in s_indexs: print subtraction_index str_list.pop(ix - subtraction_index) # print str_list[ix-subtraction_index] str_list.pop(ix - subtraction_index + 2) subtraction_index += 2 replace_str = ''.join(str_list) # pdb.set_trace() # print replace_str # pdb.set_trace() res_field_values = ['NULL' if f_val is None else f_val for f_val in field_values] return replace_str, tuple(res_field_values) else: return patch_sql, field_values def __handler_read_event(self, FileNo): self.logger.info('Start handle the recieved data...') # 对接受到数据做业务处理 try: self.__deal_business(FileNo) except Exception, e: self.logger.error('__deal_business Exception: %s' % e) # self.logger.debug(datetime.datetime.now()) try: self.epoll.modify(FileNo, select.EPOLLOUT | select.EPOLLET | select.EPOLLONESHOT) except: pass self.logger.error('Deal_business ERRor') else: self.modify_revent_to_wevent(FileNo) self.logger.info('Handle the recieved data End!') # content = dict(sync_point=self.detect_piont_name,sync_point_no = self.detect_piont_serial_number) # message_info = (Communication_Packet_Flags.DOLOAD_DB,str(content),True,table_name,table_field) # error_info = (False,None,None) # message_status = (Communication_Packet_Flags.RESPONSE,None,str(datetime.datetime.now()),True) def __handler_write_event(self, FileNo): # if errorInfo is not null,we send Error to Client else handing write business Error_Info = None try: Error_Info = self.errorInfo[FileNo] # reset error info to None self.errorInfo[FileNo] = '' except: # 说明socket已从列表注销和直接退出程序 self.logger.info('This socket is removed from error info list!') return error_info = (False, None, None) if Error_Info: print Error_Info # using debug error_info = (True, Error_Info[0], Error_Info[1]) response = self.responseInfo[FileNo] # need reset the response info self.responseInfo[FileNo] = '' res_info = (None, response, None, None, False, None) info_states = (Communication_Packet_Flags.RESPONSE, None, None, True) message = self.packaging_message(res_info, error_info, info_states) self.logger.debug(message) self.send_message(FileNo, message, True) # send the message to client self.modify_wevent_to_revent(FileNo) # modify the event def modify_wevent_to_revent(self, FileNo): ''' If we trigger the read envet,we use this function ''' try: # We need modify event to read event! self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLIN | select.EPOLLONESHOT) except: pass def modify_revent_to_wevent(self, FileNo): ''' If we trigger the write envet,we use this function ''' try: self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLOUT | select.EPOLLONESHOT) except: pass def send_message(self, FileNo, message, blocking=True): # if message is big,use noblocking it will occur error! # so, we maybe set it to blocking if blocking: self.connections[FileNo].setblocking(True) if FileNo not in self.connections: self.logger.debug('This socket not in the connections list!') return try: self.connections[FileNo].sendall(message) except Exception, e: pass # last we need to set it to False! we use the noblocking module if blocking: self.connections[FileNo].setblocking(False) def start_server(self): try: self.__init_server() # 初始化服务器 except: # 初始化服务器错误 self.logger.critical('Init server Error...') raise while True: # join the thread self.__jionthreads() # Wait for at least one of the sockets to be ready for processing self.logger.info('waiting for the next event') events = self.epoll.poll() for fileno, event in events: # Handle inputs if fileno == self.server.fileno(): try: while True: connection, address = self.server.accept() connection.setblocking(0) # 设置连接为非阻塞模式 # Here we can not use select.EPOLLONESHOT flag.This flag self.epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET) # 把新来的连接同样设置为边缘出发模式 self.connections[connection.fileno()] = connection # 记录连接 self.requests[connection.fileno()] = '' # 记录业务请求 self.addresses[connection.fileno()] = address # 记录连接地址 self.errorInfo[connection.fileno()] = '' self.responseInfo[connection.fileno()] = '' # 设置错误信息如果为空串则是无错误信息 self.logger.info('========================================') self.logger.info('Client %s:%s connected server' % (address)) except socket.error: pass # elif event & (select.EPOLLIN | select.EPOLLONESHOT): elif event & select.EPOLLIN: # Read data from socket untill data is recieved over! self.logger.debug('EVENT EPOLLIN: %s' % hex(event)) # pdb.set_trace() try: r_flag = self.__read_from_socket(fileno) except socket.error: pass except Exception, e: # if we catch other Exception, it is to say that we recieved data from client Error! # We need send error data to client! self.logger.warning('Catch other exception when recieve data!') self.errorInfo[fileno] = ( Error_Info_Flags.Receive_Data_Error, '-----Server recieved data Error!-----') self.modify_revent_to_wevent(fileno) else: # if it has no exception when eval the data, we think that client data is recieved over. # #then start a new thread to deal with the client data # if not r_flag: # print '#################################' # pass # else: if self.requests[fileno]: # Start a new thread to disposal the client requests # self.logger.debug(self.requests[fileno]) if self.requests[fileno].endswith(self.EOF): newthread = threading.Thread(target=self.__handler_read_event, args=(fileno,)) newthread.daemon = True newthread.start() print 'start %s' % newthread.name # print 'print start new thread' else: # 没有从客户端读取到数据,说明客户端已经关闭,主动挂断 # self.logger.info("closing %s %s (HUP)" % self.addresses[fileno]) self.__delevnet(fileno) # elif event & (select.EPOLLOUT | select.EPOLLONESHOT): elif event & select.EPOLLOUT: self.logger.debug('EVENT EPOLLOUT: %s' % bin(event)) # Write event happened,we use proxy to deal # print 'Current file descripter: %d' % fileno self.__proxy( fileno) # We neet a proxy using a function,but not a threadiing! If threading it has bugs(multi trigger event--I think) elif event & select.EPOLLHUP: self.logger.debug('EVENT EPOLLHUP: %s' % bin(event)) # Client hung up, del event! self.logger.info("closing %s %s (HUP)" % self.addresses[fileno]) self.__delevnet(fileno) elif event & select.EPOLLERR: # self.logger.debug('EVENT: %s' % event) self.logger.info(" exception on %s" % connections[fileno].getpeername()) self.__delevnet(fileno) else: # self.logger.debug('EVENT: %s' % bin(event)) # Other event,do not handle pass if __name__ == '__main__': # pdb.set_trace() myserver = Server() myserver.start_server()