三、协程
3.1协程概念
协程:又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程
协程的好处:
-
无需线程上下文切换的开销
-
无需原子操作锁定及同步的开销方便切换控制流,简化编程模型
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
-
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
协程的缺点:
-
无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
-
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
协程定义或标准(满足1,2,3就可称为协程):
-
必须在只有一个单线程里实现并发
-
修改共享数据不需加锁
-
用户程序里自己保存多个控制流的上下文栈
-
一个协程遇到IO操作自动切换到其它协程
“上下文”,指的是程序在执行中的一个状态。通常我们会用调用栈来表示这个状态——栈记载了每个调用层级执行到哪里,还有执行时的环境情况等所有有关的信息。
“上下文切换”,表达的是一种从一个上下文切换到另一个上下文执行的技术。而“调度”指的是决定哪个上下文可以获得接下去的CPU时间的方法。
与线程比较:
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换
对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
用yield生成器函数实现单线程下保存程序的运行状态:
import time
def consumer():
r = ''
while True:
n = yield r
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
cr = c.send(n) # cr="200 ok"
print('[PRODUCER] Consumer return: %s' % cr)
c.close()
if __name__=='__main__':
c=consumer() # c:生成器对象
produce(c)
3.2 greenlet类实现协程
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.
用greenlet类实现协程举例:
from greenlet import greenlet
def test1():
print (12)
gr2.switch()
print (34)
gr2.switch()
def test2():
print (56)
gr1.switch()
print (78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
>>:12
56
34
78
3.3 基于greenlet类用 gevent模块实现协程
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
用gevent模块实现爬虫
from gevent import monkey monkey.patch_all() import requests,gevent,time def foo(url): respnse=requests.get(url) respnse_str=respnse.text print("GET data %s"%len(respnse_str)) s=time.time() gevent.joinall([gevent.spawn(foo,"https://itk.org/"), gevent.spawn(foo, "https://www.github.com/"), gevent.spawn(foo, "https://baidu.com/")]) print(time.time()-s)
上例中还可以用gevent.sleep(2)来模拟gevent可以识别的i/o阻塞
而time.sleep(2)或其他的阻塞 gevent是不能直接识别的,需要添加补丁,添加补丁代码如下:
from gevent import monkey
monkey.patch_all()
补丁代码必须放在导入其他模块之前,及放在文件开头
附:用进程池、多线程、协程爬虫时间比较
from gevent import monkey monkey.patch_all() import requests import re from multiprocessing import Pool import time,threading import gevent def getpage(res): response_str=requests.get(res) print('ecdoing is :',response_str.encoding) return response_str.text def js(ret): li=[] for item in ret: dic={'title':item[2],'date':item[1],'评论数':item[0]} li.append(dic) f=open('acfun.txt','a',encoding='utf-8') for i in li: f.write(str(i)) f.write('\n') f.close() def run(n): url='http://www.acfun.cn/v/list73/index_%s.htm'%n print(url) response=getpage(url) # response=response.encode('ISO-8859-1').decode('utf-8') obj=re.compile('<span class="a">(\d+)</span>.*?<a href=.*? target=".*?" title="发布于 (.*?)" class="title">(.*?)</a>',re.S) # obj = re.compile(r'<img.*?src=.(\S+\.jpg).*?', re.S) ret=obj.findall(response) # print(ret) return js(ret) if __name__ == '__main__': start_time=time.time() #顺序执行 # start_time=time.time() # for j in range(1,100): # run(j) # #顺序执行cost time: 51.30734419822693 #多线程并发执行 # li=[] # for j in range(1,100): # j = threading.Thread(target=run, args=(j,)) # j.start() # li.append(j) # for obj in li: # obj.join() # 并发执行不使用join cost time: 0.20418000221252441 # 并发执行使用join cost time: 4.524945974349976 #使用进程池 # p = Pool(5) # for i in range(1,100): # p.apply_async(func=run,args=(i,)) # p.close() # p.join() #使用进程池cost time: 6.876262426376343 #使用协程 li = [] for i in range(1, 100): li.append(gevent.spawn(run, i)) gevent.joinall(li) #使用协程第一次cost time: 4.432950973510742 #使用协程第二次cost time: 30.864907264709473 #使用协程第三次cost time: 13.472567558288574 end_time=time.time() print('cost time:', end_time-start_time)