进程
multiprocess
Process —— 进程 在python中创建一个进程的模块
    start
    daemon 守护进程
    join 等待子进程执行结束

锁 Lock
acquire release
锁是一个同步控制的工具
如果同一时刻有多个进程同时执行一段代码,
那么在内存中的数据是不会发生冲突的
但是,如果涉及到文件,数据库就会发生资源冲突的问题
我们就需要用锁来把这段代码锁起来
任意一个进程执行了acquire之后,
其他所有的进程都会在这里阻塞,等待一个release

信号量 semaphore
锁 + 计数器
同一时间只能有指定个数的进程执行同一段代码

事件 Event
set clear is_set   控制对象的状态
wait  根据状态不同执行效果也不同
    状态是True ---> pass
    状态是False --> 阻塞
一般wait是和set clear放在不同的进程中
set/clear负责控制状态
wait负责感知状态
我可以在一个进程中控制另外一个或多个进程的运行情况

IPC通信
队列 Queue
管道 PIPE

一、进程间通信(队列和管道)                                           

判断队列是否为空

from multiprocessing import Process,Queue
q = Queue()
print(q.empty())

执行输出:True

 

判断队列是否满了

from multiprocessing import Process,Queue
q = Queue()
print(q.full())

执行输出:False

 

如果队列已满,再增加值的操作,会被阻塞,直到队列有空余的

from multiprocessing import Process,Queue
q = Queue(10)  # 创建一个只能放10个value的队列
for i in range(10):
    q.put(i)  # 增加一个value
print(q.qsize())  # 返回队列中目前项目的正确数量
print(q.full())  # 如果q已满,返回为True
q.put(111)  # 再增加一个值
print(q.empty())

执行输出:

10
True

从结果中,可以看出,下面的操作q.put(111)之后的代码被阻塞了。

总结:

队列可以在创建的时候制定一个容量
如果在程序运行的过程中,队列已经有了足够的数据,再put就会发生阻塞
如果队列为空,在get就会发生阻塞

为什么要指向队列的长度呢?是为了防止内存爆炸。
一个队列,不能无限制的存储。毕竟,内存是有限制的。

python 全栈开发,Day40(进程间通信(队列和管道),进程间的数据共享Manager,进程池Pool)

 

上面提到的put,get,qsize,full,empty都是不准的。
因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

 

import time
from multiprocessing import Process,Queue
def wahaha(q):
    print(q.get())
    q.put(2)  # 增加数字2

if __name__ == '__main__':
    q = Queue()
    p = Process(target=wahaha,args=[q,])
    p.start()
    q.put(1)  # 增加数字1
    time.sleep(0.1)
    print(q.get())

执行输出:

1
2

先执行主进程的q.get(),再执行子进程的q.get()

在进程中使用队列可以完成双向通信

队列是进程安全的 内置了锁来保证队列中的每一个数据都不会被多个进程重复取

python 全栈开发,Day40(进程间通信(队列和管道),进程间的数据共享Manager,进程池Pool)

在同一时刻,只能有一个进程来取值,它内部有一个锁的机制。那么另外一个进程就会阻塞一会,但是阻塞的时间非常短
队列能保证数据安全,同一个数据,不能被多个进程获取。

 

生产者消费者模型
解决数据供需不平衡的情况

from multiprocessing import Process,Queue
def producer(q,name,food):
    for i in range(5):
        print('{}生产了{}{}'.format(name,food,i))

if __name__ == '__main__':
    q = Queue()
    Process(target=producer,args=[q,'康师傅','红烧牛肉']).start()
    Process(target=producer,args=[q,'郑师傅','红烧鱼块']).start()

执行输出:

康师傅生产了红烧牛肉0
康师傅生产了红烧牛肉1
康师傅生产了红烧牛肉2
康师傅生产了红烧牛肉3
康师傅生产了红烧牛肉4
郑师傅生产了红烧鱼块0
郑师傅生产了红烧鱼块1
郑师傅生产了红烧鱼块2
郑师傅生产了红烧鱼块3
郑师傅生产了红烧鱼块4

 

增加一个消费者

import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.random())  # 模拟生产时间
        print('{}生产了{}{}'.format(name,food,i))
        q.put('{}{}'.format(food,i))  # 放入队列
        
def consumer(q,name):
    for i in range(10):
        food = q.get()  # 获取队列
        time.sleep(random.random())  # 模拟吃的时间
        print('{}吃了{}'.format(name,food))

if __name__ == '__main__':
    q = Queue()
    Process(target=producer,args=[q,'康师傅','红烧牛肉']).start()
    Process(target=producer,args=[q,'郑师傅','红烧鱼块']).start()
    Process(target=consumer,args=[q,'xiao']).start()

执行输出:

郑师傅生产了红烧鱼块0
xiao吃了红烧鱼块0
康师傅生产了红烧牛肉0
xiao吃了红烧牛肉0
康师傅生产了红烧牛肉1
郑师傅生产了红烧鱼块1
xiao吃了红烧牛肉1
康师傅生产了红烧牛肉2
郑师傅生产了红烧鱼块2
康师傅生产了红烧牛肉3
郑师傅生产了红烧鱼块3
康师傅生产了红烧牛肉4
xiao吃了红烧鱼块1
郑师傅生产了红烧鱼块4
xiao吃了红烧牛肉2
xiao吃了红烧鱼块2
xiao吃了红烧牛肉3
xiao吃了红烧鱼块3
xiao吃了红烧牛肉4
xiao吃了红烧鱼块4

 

消费者,必须是有的吃,才能吃。没有吃的,就等着。
一个消费者,明显消费不过来。再加一个消费者

import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.random())  # 模拟生产时间
        print('{}生产了{}{}'.format(name,food,i))
        q.put('{}{}'.format(food,i))  # 放入队列

def consumer(q,name):
    for i in range(5):  # 修改为5,因为有2个人
        food = q.get()  # 获取队列
        time.sleep(random.random())  # 模拟吃的时间
        print('{}吃了{}'.format(name,food))

if __name__ == '__main__':
    q = Queue()
    Process(target=producer,args=[q,'康师傅','红烧牛肉']).start()
    Process(target=producer,args=[q,'郑师傅','红烧鱼块']).start()
    Process(target=consumer,args=[q,'xiao']).start()
    Process(target=consumer, args=[q,'lin']).start()

执行输出:

康师傅生产了红烧牛肉0
郑师傅生产了红烧鱼块0
xiao吃了红烧牛肉0
郑师傅生产了红烧鱼块1
康师傅生产了红烧牛肉1
lin吃了红烧鱼块0
郑师傅生产了红烧鱼块2
康师傅生产了红烧牛肉2
郑师傅生产了红烧鱼块3
xiao吃了红烧鱼块1
郑师傅生产了红烧鱼块4
lin吃了红烧牛肉1
xiao吃了红烧鱼块2
康师傅生产了红烧牛肉3
xiao吃了红烧鱼块3
lin吃了红烧牛肉2
xiao吃了红烧鱼块4
康师傅生产了红烧牛肉4
lin吃了红烧牛肉3
lin吃了红烧牛肉4

注意:必须将消费者的rang(10)修改为5,否则程序会卡住。为什么呢?因为队列已经是空的,再取就会阻塞

这样才能解决供需平衡

那么问题来了,如果有一个消费者,吃的比较快呢?
再修改range值?太Low了
能者多劳嘛,
不能使用q.empty(),它是不准确的

看下图,有可能一开始,队列就空了
下面的0.1更快

python 全栈开发,Day40(进程间通信(队列和管道),进程间的数据共享Manager,进程池Pool)

 看下面的解决方案:

import time
import random
from multiprocessing import Process,Queue
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.random())  # 模拟生产时间
        print('{}生产了{}{}'.format(name,food,i))
        q.put('{}{}'.format(food,i))  # 放入队列

def consumer(q,name):
    while True:
        food = q.get()  # 获取队列
        if food == 'done':break  # 当获取的值为done时,结束循环
        time.sleep(random.random())  # 模拟吃的时间
        print('{}吃了{}'.format(name,food))

if __name__ == '__main__':
    q = Queue()  #创建队列对象,如果不提供maxsize,则队列数无限制
    p1 = Process(target=producer,args=[q,'康师傅','红烧牛肉'])
    p2 = Process(target=producer,args=[q,'郑师傅','红烧鱼块'])
    p1.start()  # 启动进程
    p2.start()
    Process(target=consumer,args=[q,'xiao']).start()
    Process(target=consumer, args=[q,'lin']).start()
    p1.join()  # 保证子进程结束后再向下执行
    p2.join()
    q.put('done')  # 向队列添加一个值done
    q.put('done')

执行输出:

康师傅生产了红烧牛肉0
郑师傅生产了红烧鱼块0
康师傅生产了红烧牛肉1
郑师傅生产了红烧鱼块1
xiao吃了红烧牛肉0
xiao吃了红烧牛肉1
康师傅生产了红烧牛肉2
康师傅生产了红烧牛肉3
xiao吃了红烧鱼块1
lin吃了红烧鱼块0
郑师傅生产了红烧鱼块2
lin吃了红烧牛肉3
xiao吃了红烧牛肉2
康师傅生产了红烧牛肉4
xiao吃了红烧牛肉4
lin吃了红烧鱼块2
郑师傅生产了红烧鱼块3
xiao吃了红烧鱼块3
郑师傅生产了红烧鱼块4
lin吃了红烧鱼块4

 

为什么要有2个done?因为有2个消费者
为什么要有2个join?因为必须要等厨师做完菜才可以。

最后输出2个done,表示通知2个顾客,菜已经上完了,顾客要结账了。

2个消费者,都会执行break。通俗的来讲,亲,您一共消费了xx元,请付款!

 

上面的解决方案,代码太长了,有一个消费者,就得done一次。

下面介绍JoinableQueue

JoinableQueue([maxsize])                                                                        

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join() 
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
方法介绍

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-06-24
  • 2022-01-15
  • 2021-09-16
  • 2022-12-23
  • 2021-11-19
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-10-20
  • 2021-11-20
相关资源
相似解决方案