一、线程:
1.线程基础
python编程,调用的是threading模块,调用方式有:
方式一:
import threading import time def sayhi(num): #定义每个线程要运行的函数 print("running on number:%s" %num) time.sleep(3) if __name__ == \'__main__\': #target 是目标函数;args是函数的参数,必须是可迭代的;name是线程名 t1 = threading.Thread(target=sayhi,args=(1,),name=i) #生成一个线程实例 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例 t1.start() #启动线程 t2.start() #启动另一个线程 print(t1.getName()) #获取线程名 print(t2.getName())
方式二:
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#覆写run()方法 #方法中定义程序内容 print("running on number:%s" %self.num) time.sleep(3) if __name__ == \'__main__\': t1 = MyThread(1) #生成线程 t2 = MyThread(2) t1.start() #启动线程 t2.start()
上面的调用方式,只能手动去生成多个线程,有没有可以批量生成线程的方式呢?答案是有的,如下:
import threading import time def worker(num): time.sleep(1) print("Thread %d" % num) for i in range(5): #主线程会在此处创建5个线程 t = threading.Thread(target=worker, args=(i,),name ="t. % d" % i) #target 是目标函数;args是函数的参数,必须是可迭代的;name是线程名 t.start() #启动线程 print("main over")
执行结果:
main over Thread 1 Thread 0 Thread 4 Thread 2 Thread 3
明明我们执行worker()方法是在for循环里,从代码解析角度来说,worker()方法应该比print("main over ")先执行,为什么执行的结果里,却是先输出了main over 然后再执行了worker()??
其实答案很简单,我们在python开发【第二十二篇】里讲过,程序在执行时,会由进程创建一个主线程,然后主线程再去创建子线程,这就说明,程序开始前就已经有个主线程在执行程序,然后需要循环说让再创建5个线程,然后主线程再创建了5个线程,那么此时进程中有6个线程。5个线程执行worker()方法,一个线程继续执行程序主线,因为worker()睡眠了1s,所以会先输出main over。
但是,有时候我们需要主线程等待子线程全部执行结束了再往下执行
#_*_coding:utf-8_*_ import threading import time def run(num): global NUM #注意这儿一定要加 global 。不然系统会将NUM定义为局部变量。print( " hi i am thread %s ...lalala " % num) NUM += 1
time.sleep(1) NUM =0 p_list = [] for i in range(30): t = threading.Thread(target=run,args=(i,)) t.start() p_list.append(t) #我们自己实现并行,先让并发线程执行,加到列表等待我们取结果即可 #t.jion() # 等待一个线程结束才会执行第二个线程,这样就成了串行,而不是并行了 for i in p_list: # i.join() #取出我们上面放入的结果。 但是串行取出 print (\'---->\',NUM) #由于加入列表时候是并发加入的,数字没有先后,所以打印结果i的时候也就看到没有顺序。最后打印的的i会导致 NUM的变化
既然有时候我们需要主线程等待子线程的完成。<称之为:前台线程>
那么就会有时候需要主线程完后,不等待子线程完成,立即完成当前任务。<称之为:后台线程/守护线程>
2、守护线程:
在java中:
import threading,time def run(n): print("task-",n,threading.currentThread()) #threading.currentThread()获取当前线程名 time.sleep(1) print("task done-", n) if __name__=="__main__": for i in range(5): t=threading.Thread(target=run,args=(i,)) t.setDaemon(True) #设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用 t.start() print("thread done",threading.current_thread())
执行结果:
task- 0 <Thread(Thread-1, started daemon 8964)> task- 1 <Thread(Thread-2, started daemon 6212)> task- 2 <Thread(Thread-3, started daemon 1592)> task- 3 <Thread(Thread-4, started daemon 7088)> task- 4 <Thread(Thread-5, started daemon 8104)> thread done <_MainThread(MainThread, started 1752)> #主线程
由上面可知道,1.函数的主流程,就是主线程。其他我们定义的线程,都是子线程
2.守护线程的语句要写在start()前,将线程定义为守护线程后,并开启。
3.非守护线程结束,守护线程也跟着结束。所以我们定义的print("task done-", n)语句未能执行。
thread方法说明
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() :线程被cpu调度后自动执行线程对象的run方法
3、GIL全称Global Interpreter Lock
为什么存在GIL
由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。
Python为了完全支持多线程编程, 但是python解释器的C语言实现部分在完全并行执行时并不是线程安全的。 于是就有了GIL,解释器被全局解释器锁(GIL)保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
但是有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待,在等待过程中,当前线程会释放GIL锁。所以,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。
故:
1.计算密集型:CPU操作密集的,使用多进程(见python开发【第二十四篇】)
2.IO密集型:使用多线程
4、线程锁(互斥锁Mutex)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print(\'--get num:\',num ) time.sleep(1) num -=1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(\'final num:\', num )
正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。
*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print(\'--get num:\',num ) time.sleep(1) lock.acquire() #修改数据前加锁 num -=1 #对此公共变量进行-1操作 lock.release() #修改后释放 num = 100 #设定一个共享变量 thread_list = [] lock = threading.Lock() #生成全局锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print(\'final num:\', num )
GIL VS Lock
机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 具体我们通过下图来看一下,就明白了。
首先我们需要明白,这里的lock是用户级的lock,是用来锁定一个变量谁可以用。GIL是python解释器的锁,是决定哪个线程可以使用python解释器。
上图解释:
在进程内,有多个线程,且有一个全局变a量count,现在对count执行加一。
1.线程一获取到count=0,然后申请到GIL锁,
2.cpu执行线程一,但是该线程太难执行完了,在cpu当前时间片用完该线程都没有执行完,那么就会进行上下文切换。当前数据信息放到寄存器中等等地方。GIL锁释放。
3.线程二获取count数据,因为线程一没有执行完,所以count数据并没有成功加1,那么导致现在count依然等于0。现在线程2有拿着count=0申请到GIL锁,前往cpu执行。
4.假设线程2一次性执行完了,那么当前count=1。
5当线程2释放了GIL锁后,线程一会直接拿着上次执行结果继续申请GIL锁,并完成执行。因线程一在获取count时是等于0,所以线程一执行完后,count依然还是1没有变成2。
那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
5、RLock(递归锁)
说白了就是在一个大锁中还要再包含子锁
import threading,time def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2 def run3(): lock.acquire() res = run1() print(\'--------between run1 and run2-----\') res2 = run2() lock.release() print(res,res2) if __name__ == \'__main__\': num,num2 = 0,0 lock = threading.RLock() #定义递归锁。不能定义为lock = threading.Lock()这样会报错的 for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print(\'----all threads done---\') print(num,num2)
Lock与Rlock 对比
#coding:utf-8 import threading lock = threading.Lock() #Lock对象 lock.acquire() lock.acquire() #产生了死锁。 lock.release() lock.release() print lock.acquire() import threading rLock = threading.RLock() #RLock对象 #推荐 rLock.acquire() rLock.acquire() #在同一线程内,程序不会堵塞。 rLock.release() rLock.release()
综上所述,为了避免代码的错误,所以不论是需要多重锁还是不需要,都推荐直接写 rLock = threading.RLock()
6、Semaphore(信号量)(简单理解:线程池)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading, time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s---%s\n" % (threading.current_thread(),threading.active_count())) semaphore.release() if __name__ == \'__main__\': num = 0 semaphore = threading.BoundedSemaphore(3) # 最多允许3个线程同时运行。但是定义的10个线程是同时开启的 for i in range(10 ): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: pass #print (threading.active_count()) else: print(\'----all threads done---\') print(num)
执行结果:
run the thread: <Thread(Thread-1, started 7164)>---11 run the thread: <Thread(Thread-3, started 3260)>---10 run the thread: <Thread(Thread-2, started 1948)>---9 run the thread: <Thread(Thread-6, started 5820)>---8 run the thread: <Thread(Thread-4, started 2968)>---8 run the thread: <Thread(Thread-5, started 2468)>---8 run the thread: <Thread(Thread-7, started 5620)>---5 run the thread: <Thread(Thread-9, started 4888)>---5 run the thread: <Thread(Thread-8, started 7064)>---5 run the thread: <Thread(Thread-10, started 4440)>---2 ----all threads done--- 0
7、evnet(事件):
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()为False时就阻塞当前线程,知道结束阻塞状态
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
构造方法:
Event()
实例方法:
isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
# encoding: UTF-8 import threading import time event = threading.Event() #定义时间 def func(): # 等待事件,进入等待阻塞状态 print \'%s wait for event...\' % threading.currentThread().getName() event.wait() #在默认状态下为False。 当为False时代码将不会执行下面的语句,将阻塞在这儿 # 收到事件后进入运行状态 print \'%s recv event.\' % threading.currentThread().getName() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) # 发送事件通知 print \'MainThread set event.\' event.set()
执行结果:
Thread-1 wait for event... Thread-2 wait for event... #2秒后。。。 MainThread set event. Thread-1 recv event. Thread-2 recv event. Process finished with exit code 0
通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
import threading,time import random def light(): event.set() #这句会将threading模块中self._flag 设置为True。 #假设我们定义True为绿灯 #这个就叫设置事件 count = 0 while True: if count < 10: print(\'\033[42;1m--green light on---\033[0m\') elif count <20: if event.isSet(): #判断event是否被设置。 event.clear() #将事件清空,即将threading模块中self._flag 设置为False print(\'\033[41;1m--red light on---\033[0m\') else: count = 0 event.set() #打开绿灯 time.sleep(1) count +=1 def car(n): while 1: time.sleep(random.randrange(5)) if not event.isSet(): #判断事件是否设置 print("car [%s] is waiting for the red light.." % n) else: print("car [%s] is running.." % n) """ def car(n): while 1: time.sleep(random.randrange(5)) #设置事件是否等待。判断依据:event.set()方法更改的self._flag。event.wait()返回的结果就是self._flag值 if not event.wait(): #这种写法的问题在于:当event.wait()为False时,该线程会被阻塞并等待到阻塞结束 print("car [%s] is waiting for the red light.." % n) else: print("car [%s] is running.." % n) """ if __name__ == \'__main__\': event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(2): t = threading.Thread(target=car,args=(i,)) t.start()
8、queue队列
如前所述,当多个线程需要共享数据或者资源的时候,可能会使得线程的使用变得复杂。线程模块提供了许多同步原语,包括信号量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易处理,并且可以使得线程编程更加安全,因为它们能够有效地传送单个线程对资源的所有访问,并支持更加清晰的、可读性更强的设计模式。
那什么是队列呢?
举例来说,我们去加油站加油。此时车就会自动排成一排形成这一排就是队列。
如果此时加油站就一个加油台(相当于线程),那么所有的车(相当于数据)就只能排着,同时人(相当于线程)也要跟随车在这儿等着。这种队列会形成线程的同步阻塞。
那么又如我们去人员爆满的高端酒店住宿,我们到了酒店门口车太多,就得排队。但是因为酒店有专门的3位停靠司机(处理队列数据的多个线程),所以我们(需要数据的线程)可以直接下车(数据、资源)并去酒店办理业务,而让车一直在那儿排着就行,由3位司机依次将队伍的车停完。停完了车然后再把钥匙交给我们,我们需要车的时候再去取就行。这种队列就是我们python中的队列。
特点:1.队列的数据可以由指定的多个线程分别同步处理
2.发送数据进队列的线程不用等待数据被执行。
3.线程安全
queue的作用:
1.解耦合(解除线程间的关系),
2.提高效率
queue队列模式:
class queue.Queue(maxsize=0)
-
构造一个FIFO(first in first out 先进先出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
- class queue.LifoQueue(maxsize=0)
-
构造一个LIFO(先进后出)队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
出现于版本2.6.
- class queue.PriorityQueue(maxsize=0)
-
构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
下面以先进先出队列的为例(另外两个都一样):
在 Python 2中,queue模块被重命名为 Queue 。其他都一样。请猛戳:queue
import queue q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,再执行别的操作,同q.task_done()一起操作
q.task_done() #用来检索队列中是否还有元素,如果没有了,就告诉q.join()结束阻塞 (具体用法见消费者-生产者模型)
q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置, 为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后, 如果队列无法给出放入item的位置,则引发 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞, 若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False)看下面的例子如果队列里没有值了怎么办?他会等待直到有数据为止:
View Codeimport queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 a = q.get() # 获取队列的第一个值 print(\'get frist one:%s\' % a) b = q.get() # 获取队列的第二个值 print(\'get second one:%s\' % b) c = q.get() # 获取队列的第三个值 #因为此时没有第三个值所以就会形成阻塞,一直等到put()进去第三个值 print(\'get third one:%s\' % c) #结果: \'\'\' get frist one:1 get second one:2 #这里没有获取到值堵塞住,一直在等待着值进来~ \'\'\'
- 如果不想让他等待,不管是否队列里都取数据,可以使用
get_nowait(),但是如果队列中没有数据就会报错!
View Codeimport queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 try: a = q.get() # 获取队列的第一个值 print(\'get frist one:%s\' % a) b = q.get() # 获取队列的第二个值 print(\'get second one:%s\' % b) c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞! #等同于q.get(block=False) print(\'get third one:%s\' % c) except queue.Empty as q_error: print(\'The Queue is empty!\')
- 如果队列为空的时候可以通过异常处理进行捕获:
View Codeimport queue q = queue.Queue() # 调用队列生成对象 q.put(1) #存放第一个值到队列 q.put(2) #存放第二个值到队列 try: a = q.get() # 获取队列的第一个值 print(\'get frist one:%s\' % a) b = q.get() # 获取队列的第二个值 print(\'get second one:%s\' % b) c = q.get_nowait() # 获取队列的第三个值,使用:get_nowait() 不堵塞! print(\'get third one:%s\' % c) except queue.Empty as q_error: print(\'The Queue is empty!\')
- 同样的如果队列长度为2,如果队列满了之后,同样他也是等待,直到有位置才会继续如下代码:
View Codeimport queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print(\'put value 1 done\') q.put(2) # 存放第二个值到队列 print(\'put vlaue 2 done\') q.put(3) # 存放第三个值到队列 print(\'put value 3 done\') #结果: \'\'\' put value 1 done put vlaue 2 done #这里会一直等待~ \'\'\'
- 同样如果存放数值的时候如果不想让他等待,使用
put_nowait()但是队列无法存放后会报错!
View Codeimport queue q = queue.Queue(2) # 调用队列生成对象,2:设置队列长度为2 q.put(1) # 存放第一个值到队列 print(\'put value 1 done\') q.put(2) # 存放第二个值到队列 print(\'put vlaue 2 done\') # q.put(33, block=False) # 不堵塞 # q.put(33, block=False, timeout=2) # 不堵塞,等待2秒 q.put_nowait(3) # 存放第三个值到队列,使用:put_nowait() 不堵塞! #等同于q.put(block=False) print(\'put value 3 done\')
更多详解:猛戳这里
- 如果不想让他等待,不管是否队列里都取数据,可以使用
生产者消费者模型:
在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产 生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型。结构图如下:
生产者消费者模型的优点:
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
举个例子,我们去邮局投递信件,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须 得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
2、支持并发
由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区了拿数据即可,这样就不会因为彼此的处理速度而发生阻塞。
接上面的例子,如果我们不使用邮筒,我们就得在邮局等邮递员,直到他回来,我们把信件交给他,这期间我们啥事儿都不能干(也就是生产者阻塞),或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。
3、支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。 等生产者的制造速度慢下来,消费者再慢慢处理掉。
为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时 候邮筒这个缓冲区就派上用场了。邮递员把来不及带走的信暂存在邮筒中,等下次过来 时再拿走。
简单的生产者消费者模型:
import queue import threading import time q = queue.Queue() # 生成者(client) def productor(arg): # 序号加包子,将做好的包子放到篮子(队列)里 q.put(str(arg) + \'包子\') print("product",arg,"包子") """ join() 保持阻塞状态,直到处理了队列中的所有项目为止。在将一个项目添加到该队列时,未完成的任务的总数就会增加。 当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。当未完成的任务的总数减少到零时,join()就会结束阻塞状态。 """ q.join() #用来判断队列中是否为空,如果为空,则继续执行下面的代码。否则下面的代码阻塞 print( "包子被吃完") # 创建30个包子 for i in range(4): t = threading.Thread(target=productor, args=(i,)) t.start() # ============================================================== # # 消费者(server) def consumer(arg): while True: # arg(0-3)吃包子得人, q.get()从篮子(队列)里取包子,包子有序号 print(arg, "eat",q.get()) time.sleep(1.5) q.task_done() #用来判断队列中是否为空 # 三个线程一起吃包子 for j in range(2): t = threading.Thread(target=consumer, args=(j,)) t.start()
执行结果:
product 0 包子 product 1 包子 product 2 包子 product 3 包子 0 eat 0包子 1 eat 1包子 0 eat 2包子 1 eat 3包子 包子被吃完 包子被吃完 包子被吃完 包子被吃完 #在这里一直等待put进元素
9.线程池:
python内部是没有封装线程池的模块,需要自己定义。定义方式就是使用for循环来生成固定数量的线程数。