xsj1

生产者消费者模型当中有两大类重要的角色,一个是生产者(负责造数据的任务),另一个是消费者(接收造出来的数据进行进一步的操作)。

 

为什么要使用生产者消费者模型?

在并发编程中,如果生产者处理速度很快,而消费者处理速度比较慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,
如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个等待的问题,就引入了生产者与消费者模型。让它们之间可以不停的生产和消费。

 

实现生产者消费者模型三要素:

 1、生产者

 2、消费者

 3、队列(或其他的容哭器,但队列不用考虑锁的问题)

 

什么时候用这个模型?

程序中出现明显的两类任务,一类任务是负责生产,另外一类任务是负责处理生产的数据的(如爬虫)

 

 

用该模型的好处?

1、实现了生产者与消费者的解耦和

2、平衡了生产力与消费力,就是生产者一直不停的生产,消费者可以不停的消费,因为二者不再是直接沟通的,而是跟队列沟通的。

 

来简单的写一个生产者消费者模型:

import time
import random
from multiprocessing import Queue,Process
 
def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(\'消费者》》%s 准备开吃%s。\'%(name,res))
 
def producer(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res=\'大虾%s\'%i
        q.put(res)
        print(\'生产者》》》%s 生产了%s\'%(name,res))
 
if __name__ == \'__main__\':
    q=Queue()#一个队列
 
    p1=Process(target=producer,args=(\'monicx\',q))
    c1=Process(target=consumer,args=(\'lili\',q))
 
    p1.start()
    c1.start()

 

运行效果:

 

 

现在确实让生产者不停的生产,消费者不断的消费。,但此时有一个问题就是主进程没有结束。原因是:生产者p1生产完后就结束了,
但是消费者c1,在q.get()取空之后,就一直在原地等待。解决这个问题无非就让生产者在生产完毕后,就再往队列中发送一个结束信号,
这样消费者接收到结束信号后就可以跳出死循环。

 

import time
import random
from multiprocessing import Queue,Process

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print(\'消费者》》%s 准备开吃%s。\'%(name,res))

def producer(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res=\'大虾%s\'%i
        q.put(res)
        print(\'生产者》》》%s 生产了%s\'%(name,res))

if __name__ == \'__main__\':
    q=Queue()#一个队列

    p1=Process(target=producer,args=(\'monicx\',q))
    c1=Process(target=consumer,args=(\'lili\',q))

    p1.start()
    c1.start()
    p1.join()
    q.put(None)

 

运行结果:

 

 

虽然解决了问题,但是这种的解决方式,在有多个生产者和多个消费者时,有几个消费者就要发送几次的结束信号,不然程序就不会停止。相当的low。就像这样:

 

import time
import random
from multiprocessing import Queue,Process

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print(\'消费者》》%s 准备开吃%s。\'%(name,res))

def producer(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res=\'大虾%s\'%i
        q.put(res)
        print(\'生产者》》》%s 生产了%s\'%(name,res))

if __name__ == \'__main__\':
    q=Queue()#实例一个队列

    p1=Process(target=producer,args=(\'monicx1\',q))
    p2=Process(target=producer,args=(\'monicx2\',q))

    c1=Process(target=consumer,args=(\'lili1\',q))
    c2=Process(target=consumer,args=(\'lili2\',q))
    c3=Process(target=consumer,args=(\'lili3\',q))

    p1.start()
    p2.start()

    c1.start()
    c2.start()
    c3.start()

    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
    q.put(None)

 

我们的思路就是发送结束信号而已,有另外一种队列提供了这种机制。

 

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被处理。通知进程是使用共享的信号和条件变量来实现的。

其中maxsize是队列中允许最大项数,省略则无大小限制。

JoinableQueue的实例q除了与Queue对象相同的方法之外还具有:

task_done():消费者用此方法发出信息,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将会导致ValueError异常。

join():生产者调用此方法进行阻塞,直到队列中所有的项目都被处理了。

 

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
 
def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(\'\033[43m消费者》》%s 准备开吃%s\033[0m\'%(name,res))
        q.task_done()#发送信号给生产者的q.join()说,已经处理完从队列中拿走的一个项目
 
def producer(name,q):
    for i in range(5):
        time.sleep(random.randint(1,2))#模拟生产时间
        res=\'大虾%s\'%i
        q.put(res)
        print(\'\033[41m生产者》》》%s 生产了%s\033[0m\'%(name,res))
    q.join()#等到消费者把自己放入队列中的所有项目都取走处理完后调用task_done()之后,生产者才能结束
 
if __name__ == \'__main__\':
    q=JoinableQueue()#实例一个队列
 
    p1=Process(target=producer,args=(\'monicx1\',q))
    p2=Process(target=producer,args=(\'monicx2\',q))
 
    c1=Process(target=consumer,args=(\'lili1\',q))
    c2=Process(target=consumer,args=(\'lili2\',q))
    c3=Process(target=consumer,args=(\'lili3\',q))
    c1.daemon=True
    c2.daemon=True
    c3.daemon=True
 
    p1.start()
    p2.start()
 
    c1.start()
    c2.start()
    c3.start()
 
    p1.join()
    p2.join()

 

运行结果:

 

分类:

技术点:

相关文章:

  • 2021-08-13
  • 2021-12-18
  • 2021-12-18
  • 2021-12-18
  • 2021-12-02
  • 2021-10-12
  • 2021-10-12
  • 2021-10-12
猜你喜欢
  • 2022-01-07
  • 2021-12-18
  • 2021-05-27
  • 2021-10-12
相关资源
相似解决方案