目录
一、队列
二、生产者消费者模型
三、协程
四、select\poll\epoll
五、paramiko
六、mysql API调用
一、队列(queue)
队列分以下三种:
class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
代码如下:
import queue class Foo(object): def __init__(self,n): self.n = n #先进先出 q = queue.Queue(maxsize=3) # q.get(timeout=3) #当队列为空时,get值会阻塞,加timeout参数后,会报错 # q.get_nowait() #与上面一样的效果 q.put([1,2,3]) q.put(Foo(1)) data = q.get_nowait() data2 = q.get_nowait() print(data,type(data)) print(data2,type(data2)) #输出结果 # [1, 2, 3] <class 'list'> # <__main__.Foo object at 0x000001E060337278> <class '__main__.Foo'> print("-----------------") print(q.full()) #判断队列里数据是否满了,满为True,未满为False。 print(q.qsize())#查看现在队列的大小(队列中有多少个数据) print(q.empty())#判断队列是否为空 print("---------------") #先入后出 q = queue.LifoQueue(maxsize=3) q.put(1) q.put(2) q.put(3) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait()) #输出结果 # 3 # 2 # 1 print("---------------") #可设施优先级的队列 q = queue.PriorityQueue(maxsize=5) q.put((15,1)) q.put((5,2)) q.put((7,3)) q.put((1,[1,2,3])) print(q.get()) print(q.get()) print(q.get()) print(q.get()) #输出结果 # (1, [1, 2, 3]) # (5, 2) # (7, 3) # (15, 1)
二、生产者消费者模型
import queue,threading import time,random q = queue.Queue() def producer(name): count = 0 while q.qsize() < 20: time.sleep(random.randrange(4)) q.put(count) print("producer %s has produced %s baozi.." %(name,q.qsize())) count += 1 def consumer(name): count = 0 while count <20: time.sleep(random.randrange(3)) if not q.empty(): data2 = q.get_nowait() print('\033[32;1mconsumer %s has eat %s baozi...\033[0m' %(name,count)) else: print("-----no baozi anymore-----") count += 1 p1 = threading.Thread(target=producer,args=("XiaoLi",)) p2 = threading.Thread(target=producer,args=("XiaoMin",)) c1 = threading.Thread(target=consumer,args=("PangZi",)) p1.start() p2.start() c1.start()
三、协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
yield实现代码:
import time,queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" %(name,new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n+=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s"%n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
Greenlet实现代码:
from greenlet import greenlet def test1(): print("12") gr2.switch() print("34") gr2.switch() def test2(): print("56") gr1.switch() print("78") gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Gevent实现代码:
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
import gevent def foo(): print('\033[31;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[31;1mExplicit context switch to foo again\033[0m") def bar(): print('\033[32;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[32;1mExplicit context switch to foo again\033[0m") def ex(): print('\033[33;1mRunning in foo\033[0m') gevent.sleep(1) print("\033[33;1mExplicit context switch to foo again\033[0m") gevent.joinall( [ gevent.spawn(foo), gevent.spawn(bar), gevent.spawn(ex), ] )
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
遇到IO阻塞时会自动切换任务
from gevent import monkey; monkey.patch_all() import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
通过gevent实现单线程下的多socket并发
server端
import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(s): try: while True: data = s.recv(1024) print("recv:", data) s.send(data) if not data: s.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: s.close() if __name__ == '__main__': server(8001)
client 端
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
四、select\selectors\poll\epoll
select、poll、epoll区别:
select:
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 # Create a TCP/IP socket 10 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建socket实例 11 server.setblocking(False) #设置套接字为非阻塞 12 13 # Bind the socket to the port 14 server_address = ('localhost', 20000) #设置通信地址 15 print(sys.stderr, 'starting up on %s port %s' % server_address) #如果有错误,打印错误 16 server.bind(server_address) #绑定通信地址 17 18 # Listen for incoming connections 19 server.listen(5) #最大监听5个链接 20 21 # Sockets from which we expect to read 22 #select()会监控三个通信列表, 23 inputs = [ server ] #定义一个所有的输入 24 25 # Sockets to which we expect to write 26 outputs = [ ] #定义一个所有的输出 27 28 message_queues = {} 29 while inputs: #客户端过来的所有实例,都放到inputs里,要使inputs为True,将server放入 30 31 # Wait for at least one of the sockets to be ready for processing 32 print( '\nwaiting for the next event') 33 readable, writable, exceptional = select.select(inputs, outputs, inputs,2) #将三个list传给select后,select会返回三个新的list,分别为可读,可写,异常。 34 # 35 36 37 # Handle inputs 38 for s in readable: #循环readable,里面有当前所有链接和server,如果server返回到readable里,代表着有新连接,server就绪。 39 40 if s is server: #new connection #如果server就绪,则有新链接 41 # A "readable" server socket is ready to accept a connection 42 connection, client_address = s.accept() #接收新链接 43 print('new connection from', client_address) #打印 44 connection.setblocking(False) #设置新链接为非阻塞 45 inputs.append(connection) #将新连接存入inputs里,当下次select时,检测此链接,若有数据则会返回此链接,则接受 46 47 # Give the connection a queue for data we want to send 48 message_queues[connection] = queue.Queue() #生成一个此链接的队列,放入message_queues字典 49 else: #如果不是server,代表有数据过来 50 data = s.recv(1024) #接受数据 51 if data: #如果data有数据 52 # A readable client socket has data 53 print(sys.stderr, 'received "%s" from %s' % (str(data,'utf8'), s.getpeername()) ) #打印data 54 message_queues[s].put(str(data,'utf8')) #把data放入字典中相应的链接列表 55 # Add output channel for response 56 if s not in outputs: #如果此链接不再outputs里 57 outputs.append(s) #把新的链接放入outputs 58 else: #如果data没有数据,(出错了) 59 # Interpret empty result as closed connection 60 print('closing', client_address, 'after reading no data') #打印 61 # Stop listening for input on the connection 62 if s in outputs: #如果链接在outpus里 63 outputs.remove(s) #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉 64 inputs.remove(s) #inputs中也删除掉 65 s.close() #把这个连接关闭掉 66 67 # Remove message queue 68 del message_queues[s] #把此链接在字典中的列表也删除 69 # Handle outputs 70 for s in writable: #循环发送队列,查找这个链接 71 try: 72 next_msg = message_queues[s].get_nowait() #获取发送的数据 73 except queue.Empty: #如果为空 74 # No messages waiting so stop checking for writability. 75 print('output queue for', s.getpeername(), 'is empty') 76 outputs.remove(s) #将outputs中的这个链接删除 77 else: #如果取到数据 78 print( 'sending "%s" to %s' % (next_msg, s.getpeername())) 79 s.send(bytes(next_msg,'utf8')) #发送数据给客户端 80 # Handle "exceptional conditions" 81 for s in exceptional: #客户端断开后,句柄错误,返回到exceptional 82 print('handling exceptional condition for', s.getpeername() ) 83 # Stop listening for input on the connection 84 inputs.remove(s) #删除inputs中的句柄 85 if s in outputs: #如果句柄在outputs中 86 outputs.remove(s) #将outputs中的句柄删除 87 s.close() #关闭链接 88 89 # Remove message queue 90 del message_queues[s] #删除此链接在message_queues中的列表