【问题标题】:When does multiprocessing's queue.get() return DONE?多处理的 queue.get() 什么时候返回 DONE?
【发布时间】:2015-08-24 21:56:37
【问题描述】:

我正在学习 Python 多处理模块,我找到了 this 示例:

from multiprocessing import Process, Queue
import time

def reader(queue):
    ## Read from the queue
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = Queue()   # reader() reads from queue
                          # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader() as a separate python process

        _start = time.time()
        writer(count, queue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print "Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start))

我想知道queue.get() 什么时候会返回DONE,所以我尝试了以下示例:

#!/bin/env python
from multiprocessing import Process, Queue
import time

if __name__=='__main__':
  queue = Queue()

  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"

  print "Before 1s get"
  print queue.get()
  print "After 1st get"

  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"

  print "Before 3rd get"
  print queue.get()
  print "After 3rd get"

此脚本的最后一条消息是Before 3rd get,此后脚本卡住了,结束它的唯一方法是终止它。从这个例子中你可以看到queue.get() 是阻塞的(代码在它结束之前不会继续)。怎么可能在原代码queue.get()返回DONE,出现这种情况?

编辑

回复@KemyLand 很好地解释了这里发生了什么,这是没有卡住的版本:

#!/bin/env python
from multiprocessing import Process, Queue
import time

if __name__=='__main__':
  queue = Queue()

  print "Before 2x put"
  queue.put(10)
  queue.put(20)
  print "After 2x put"

  print "Before 1s get"
  print queue.get()
  print "After 1st get"

  print "Before 2nd get"
  print queue.get()
  print "After 2nd get"

  print "Before 3rd get"
  try:
    print queue.get_nowait()
    print "After 3rd get"
  except:
    pass

【问题讨论】:

    标签: python multithreading multiprocessing python-multiprocessing


    【解决方案1】:

    这很简单。

    在您的第一个代码中,readerwriter 之间达成的“协议”是 writerreader 发送任意数量的数据,然后 writer 发送“DONE”,@ 987654326@收到,明白数据传输完成。

    在您的第二个代码中,readerwriter 之间没有约定的协议,因为作者的观点是“我发送 两个对象,我完成了!”,而读者的观点是“*我收到 三个 个对象,我完成了!”。

    由于运行时环境的任何部分都无法检测到何时发生协议错误,因此应用程序只是阻塞,等待永远不会到来的数据。唯一可以检测到这种情况的是应用程序本身,因为它是唯一知道它遵循的协议的人。为此,您可以使用Queue.Queue.get_nowait()(您必须 import Queue(大写Q),因为multiprocessing.Queue 只是Queue.Queue 的别名)。如果这样的函数不能间接Queue 中提取对象,它将抛出Queue.Empty 异常。 (注意:模块名称的混乱在 Python 3 中已修复)。

    【讨论】:

    • 哦,这样的学者错误。没有意识到作者发送了DONE。老实说,我并没有完全理解你所说的关于导入的内容。我已经修改了我的问题以包括工作解决方案,请检查一下。非常感谢。
    • @WakanTanka:当然是对的!如果您还有其他问题,请告诉我们:)。
    • @KemmyLand,我正在学习多处理模块,所以我有很多问题会持续发布;)如果您有兴趣,请查看我的个人资料,在过去 24 小时内有几个问题,到目前为止没有人把我推向正确的方向。非常感谢。
    • @WakanTanka:一会儿我会去你的个人资料。这对我来说是一个晚上有用的东西的好方法:)。
    猜你喜欢
    • 2016-09-11
    • 2018-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-25
    • 2010-11-01
    • 2014-11-04
    • 1970-01-01
    相关资源
    最近更新 更多