qjj19931230
  • 生产者消费者模型

  • 在并发编程中,比如爬虫,有的线程负责爬取数据,有的线程负责对爬取到的数据做处理(清洗、分类和入库)。假如他们是直接交互的,那么当二者的速度不匹配时势必出现等待现象,这也就产生了资源的浪费。
  • 抽象是一种很重要的通用能力,而生产者消费者模型是前人将一系列同类型的具体的问题抽象出来的一个一致的最佳解决方案。
  • 该模型有三个重要角色,容器,生产者和消费者,
  • 顾名思义,生产者就是负责生产数据或任务的,消费者就是负责消费数据或者任务的(下文统称为任务),容器是二者进行通讯的媒介。
  • 在该模型中,生产者和消费者不在直接进行通讯,而是通过引入一个第三者容器(通常都是用阻塞队列)来达到解耦的目的。
  • 这样生产者不必在因为消费者速度过慢而等待,直接将任务放入容器即可,消费者也不必因生产者生产速度过慢而等待,直接从容器中获取任务,以此达到了资源的最大利用。
  • 简易版

  • 我们先写一个单生产者和单消费者的简易版生产者消费者模型。
  •  1 import threading
     2 import time
     3 import queue
     4 
     5 def consume(thread_name, q):
     6     while True:
     7         time.sleep(2)
     8         product = q.get()
     9         print("%s consume %s" % (thread_name, product))
    10 
    11 def produce(thread_name, q):
    12     for i in range(3):
    13         product = \'product-\' + str(i)
    14         q.put(product)
    15         print("%s produce %s" % (thread_name, product))
    16         time.sleep(1)
    17     
    18             
    19 q = queue.Queue()
    20 p = threading.Thread(target=produce, args=("producer",q))
    21 c = threading.Thread(target=consume, args=("consumer",q))
    22 
    23 p.start()
    24 c.start()
    25 
    26 p.join()
    27 
    28 # 输出如下
    29 producer produce product-0
    30 producer produce product-1
    31 consumer consume product-0
    32 producer produce product-2
    33 consumer consume product-1
    34 consumer consume product-2
    35 ...

     

  • 最佳实践

  • 我们可以结合队列的内置函数 task_done() 和 join() 来达到我们的目的。
  • join() 函数是阻塞的。当消费者通过 get() 从队列获取一项任务并处理完成之后,需要调用且只可以调用一次 task_done(),该方法会给队列发送一个信号,join()函数则在监听这个信号。
  • 可以简单理解为队列内部维护了一个计数器,该计数器标识未完成的任务数,每当添加任务时,计数器会增加,调用 task_done()时计数器则会减少,直到队列为空。而join() 就是在监听队列是否为空,一旦条件满足则结束阻塞状态。
  •  1 import threading
     2 import time
     3 import queue
     4 
     5 def consume(thread_name, q):
     6     while True:
     7         time.sleep(2)
     8         product = q.get()
     9         print("%s consume %s" % (thread_name, product))
    10         q.task_done()
    11 
    12 def produce(thread_name, q):
    13     for i in range(3):
    14         product = \'product-\' + str(i)
    15         q.put(product)
    16         print("%s produce %s" % (thread_name, product))
    17         time.sleep(1)
    18     q.join()
    19             
    20 q = queue.Queue()
    21 p = threading.Thread(target=produce, args=("producer",q))
    22 c = threading.Thread(target=consume, args=("consumer",q))
    23 c1 = threading.Thread(target=consume, args=("consumer-1",q))
    24 
    25 c.setDaemon(True)
    26 c1.setDaemon(True)
    27 p.start()
    28 c.start()
    29 c1.start()
    30 
    31 p.join()
    32 
    33 # 输出如下
    34 producer produce product-0
    35 producer produce product-1
    36 consumer-1 consume product-0
    37 consumer consume product-1
    38 producer produce product-2
    39 consumer consume product-2

     

  • 上述示例中,我们将消费者线程设置为守护线程,这样当主线程结束时消费者线程也会一并结束。然后主线程最后一句 p.join() 又表示主线程必须等待生产者线程结束后才可以结束。
  • 再细看生产者线程的主函数 produce(),该函数中出现了我们上面说过的 q.join() 函数。而 task_done 则是在消费者线程的主函数中调用的。故当生产者线程生产完所有任务后就会被阻塞,只有当消费者线程处理完所有任务后生产者才会阻塞结束。随着生产者线程的结束,主线程也一并结束,守护线程消费者线程也一并结束,自此所有线程均安全退出。
  • Queue 总结

  • 本章节介绍了队列的高级应用,从简易版的示例到最佳实践,介绍了生产者消费者模型的基本用法,在该模型中,队列扮演了非常重要的角色,起到了解耦的目的。
  • 本模型有固定的步骤,其中最重要的就是通过 task_done() 和 join() 来互相通信。task_done() 仅仅用来通知队列消费者已完成一个任务,至于任务是什么它毫不关心,它只关心队列中未完成的任务数量。
  • 注意:task_done() 不可以在 put() 之前调用,否则会引发 ValueError: task_done() called too many times。同时在处理完任务后只可以调用一次该函数,否则队列将不能准确计算未完成任务数量。

分类:

技术点:

相关文章: