协程实际上是一个线程,执行了多个任务,遇到IO就切换

切换,可以使用yield,greenlet

遇到IO gevent: 检测到IO,能够使用greenlet实现自动切换,规避了IO阻塞问题。

昨天没有讲到的小问题,看下面的例子:

import gevent
def func():
    print('eating')

gevent.spawn(func)  # 协程任务开启

执行程序,没有输出结果

 

加上join

import gevent
def func():
    print('eating')

g = gevent.spawn(func)
g.join()  # 阻塞,等待协程执行完毕

执行输出:eating

 

加上睡眠

import time
import gevent
def func():
    print('eating')

g = gevent.spawn(func)
time.sleep(1)  # 等待1秒

执行程序,没有输出结果

 

使用gevent.sleep()

import gevent
def func():
    print('eating')

g = gevent.spawn(func)
gevent.sleep(1)

执行输出:eating

 

导入monkey模块,再使用内置的time.sleep()

from gevent import monkey;monkey.patch_all()
import time
import gevent
def func():
    print('eating')

g = gevent.spawn(func)
time.sleep(1)

执行输出:eating

 

修改为sleep(0)

from gevent import monkey;monkey.patch_all()
import time
import gevent
def func():
    print('eating')

g = gevent.spawn(func)
time.sleep(0)

执行输出:eating

time.sleep(0) 虽然时间为0,它也是阻塞,gevent检测到了IO

 

总结:

协程任务开启,不会立即执行,它需要IO才能执行

 

from gevent import monkey;monkey.patch_all()
import time
import gevent
def func():
    print('eating1')  # 执行
    time.sleep(0.1)  # 遇到IO
    print('eating2')
    time.sleep(0.1)
    print('eating3')
    time.sleep(0.1)
g = gevent.spawn(func)  # 协程任务开启
time.sleep(0)  # 阻塞遇到IO

执行输出:

eating1

结果为什么是eating1?下面的2和3 为啥不执行呢?

因为time.sleep(0) 比time.sleep(0.1)要快,执行之后,就结束了。

切换到主进程时,发现主进程已经结束了。所以eating2和eating3没有执行。

 

 如果用join,就可以让eating2和earting3执行

from gevent import monkey;monkey.patch_all()
import time
import gevent
def func():
    print('eating1')  # 执行
    time.sleep(0.1)  # 遇到IO
    print('eating2')
    time.sleep(0.1)
    print('eating3')
    time.sleep(0.1)
g = gevent.spawn(func)  # 协程任务开启
#time.sleep(0)  # 阻塞遇到IO
g.join()  # 等待协程结束

执行输出:

eating1
eating2
eating3

 

总结:

内部执行switch时,必须保证协程不结束之前,主线程不结束

 

在昨天的socket例子中,没有用到join
因为accept是永久阻塞,它不需要join

 并发编程,在面试很重要。一定要熟练掌握。

 

一、IO模型介绍                                                                

  为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞

    同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。

    本文讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。

    Stevens在文章中一共比较了五种IO Model:
    * blocking IO           阻塞IO
    * nonblocking IO      非阻塞IO
    * IO multiplexing      IO多路复用
    * signal driven IO     信号驱动IO
    * asynchronous IO    异步IO
    由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

    再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

#1)等待数据准备 (Waiting for the data to be ready)
#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

 

异步:多个任务,并发执行。再执行一个操作的同时,又有另外一个任务也在执行。

input accept recv recvfrom read(文件)   这些表示长时间IO

sleep join

print send write log.debug   这些表示短暂IO

 

二、阻塞IO(blocking IO)                                                 

  在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

python 全栈开发,Day44(IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制)

  

  当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
    所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

    ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

      python 全栈开发,Day44(IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制)

  实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

    一个简单的解决方案:

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

该方案的问题是:

#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案:   

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

   对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

 

python 全栈开发,Day44(IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制)

发送数据,首先由操作系统接收数据
操作系统陷入阻塞,等待数据阶段
因为它不知道,它时候数据来了

上图的几个绿色箭头,都是操作系统做的。
用户态接收到消息,结束阻塞状态
上图是网络IO模型

之前的并发,是多创建了几个进程
那么阻塞在进程中。用户虽然感觉不到,本质上是不能解决IO问题

阻塞IO是比较低效的
所有的阻塞都不会调用CPU

 

IO多的线程:比如任务过多时,会耗费CPU
协程 500个任务,不耗费CPU,切换速度非常快。
所以协程比线程快
如果是高计算型的,需要用到多进程,在多进程,再起协程
这样效率会更高,它会规避很多IO操作
如果任务比较少,起线程和协程区别很小。但是协程会更好一点

 

总结:
协程能解决的事情,不要用线程
在其他语言里面:
    多进程 数据隔离 可以利用多核
    多线程 数据不隔离,可以利用多核
    协程 数据不隔离 不能利用多核
    所以,协程很鸡肋

在Cpython解释器下的python语言
    多进程 数据隔离 可以利用多核
    多线程 数据不隔离,不能利用多核
    协程 数据不隔离 不能利用多核
    但是协程的优势大于线程。在Cpython下,效率非常快。

 

三、非阻塞IO(non-blocking IO)                                     

Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

  python 全栈开发,Day44(IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制)

  从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

    也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

    所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

#服务端
from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[]
del_l=[]
while True:
    try:
        conn,addr=s.accept()
        conn_l.append(conn)
    except BlockingIOError:
        print(conn_l)
        for conn in conn_l:
            try:
                data=conn.recv(1024)
                if not data:
                    del_l.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                del_l.append(conn)

        for conn in del_l:
            conn_l.remove(conn)
            conn.close()
        del_l=[]

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
非阻塞IO实例

相关文章:

  • 2021-09-25
  • 2021-09-05
  • 2021-12-26
  • 2021-11-19
猜你喜欢
  • 2021-06-10
  • 2021-04-22
  • 2021-12-01
  • 2021-09-08
  • 2021-05-31
  • 2021-04-14
相关资源
相似解决方案