【问题标题】:Checking for empty Queue in python's multiprocessing在 python 的多处理中检查空队列
【发布时间】:2011-10-29 20:39:42
【问题描述】:

我有一个程序使用 python 的包多处理和队列。我的一个函数有这样的结构:

from multiprocessing import Process, Queue
def foo(queue):
   while True:
       try:
           a = queue.get(block = False)
           doAndPrintStuff(a)
       except:
           print "the end"
           break

   if __name__ == "__main__"
     nthreads = 4
     queue = Queue.Queue()
     # put stuff in the queue here 
     for stuff in moreStuff:
         queue.put(stuff)
     procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
     for p in procs:
       p.start()
     for p in procs:
       p.join()

这个想法是,当我尝试从队列中提取并且它是空的时,它会引发异常并终止循环。所以我有两个问题:

1) 这是一个安全的习语吗?有没有更好的方法来做到这一点?

2) 我试图找出当我尝试从空队列中.get() 时引发的确切异常是什么。目前我的程序正在捕获所有异常,当错误出现在其他地方并且我只收到“结束”消息时,这很糟糕。

我试过了:

  import Queue
  queue = Queue.Queue()
  [queue.put(x) for x in xrange(10)]
  try: 
       print queue.get(block = False)
  except Queue.Empty:
       print "end"
       break

但我得到了错误,就好像我没有发现异常一样。要捕获的正确异常是什么?

【问题讨论】:

    标签: python queue multiprocessing


    【解决方案1】:

    尝试阅读queue 库文档。你不是在找Queue.empty()吗?

    【讨论】:

      【解决方案2】:

      例外应该是Queue.Empty。但是你确定你得到了 same 错误吗?在您的第二个示例中,您还将队列本身从multiprocessing.Queue 切换到Queue.Queue,我认为这可能是问题所在。

      这可能看起来很奇怪,但你必须使用multiprocessing.Queue 类,但使用Queue.Empty 异常(你必须从Queue 模块中自己导入)

      【讨论】:

      • 哼!谢谢!我不知道我必须从不同的包中导入东西。现在可以了。
      【解决方案3】:

      在刷新 put 缓冲区之前,队列似乎是空的,这可能需要一段时间。

      解决我们问题的方法是to usesentinels,或者可能是内置的task_done()调用:

      task_done()

      表示以前排队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个 get(),后续 调用 task_done() 告诉队列任务的处理是 完成。

      如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(意味着收到 task_done() 调用 每个被 put() 放入队列的项目)。

      如果调用次数超过队列中放置的项目数,则引发 ValueError。

      【讨论】:

        【解决方案4】:

        这是一个示例-正如@Steven 上面所说,您需要使用标准队列中的 queue.Empty 异常。文档中的注释 (https://docs.python.org/3/library/multiprocessing.html):

        注意

        多处理使用通常的 queue.Empty 和 queue.Full 异常 发出超时信号。它们在多处理中不可用 命名空间,因此您需要从队列中导入它们。

        基本示例:

         from multiprocessing import Process, Queue, Manager
         import queue
        
        def firstPass(q):
            driver = getDriver()
        
            while True:
                try:      
                    link = q.get_nowait()   
                    f(driver, link)
                except queue.Empty:
                    logger.info("empty queue")
                    driver.close()
                    break
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2022-12-09
          • 1970-01-01
          • 1970-01-01
          • 2018-06-30
          • 2012-07-11
          • 1970-01-01
          • 2015-10-18
          • 2013-06-18
          相关资源
          最近更新 更多