【问题标题】:Python iterable QueuePython 可迭代队列
【发布时间】:2012-07-02 05:26:46
【问题描述】:

我需要知道队列何时关闭并且不会有更多项目,以便我可以结束迭代。

我通过在队列中放置一个哨兵来做到这一点:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

鉴于这是队列的一种非常常见的用途,难道没有任何内置实现吗?

【问题讨论】:

  • 我要么使用哨兵,要么使用线程中的标志来停止对队列的迭代。对于后者,我通常会等待超时。

标签: python queue iteration


【解决方案1】:

哨兵是生产者发送没有更多队列任务即将到来的消息的合理方式。

FWIW,你的代码可以用iter()的两个参数形式来简化很多:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

【讨论】:

    【解决方案2】:

    多处理模块有自己的Queue 版本,其中包含close 方法。我不确定它在线程中是如何工作的,但值得一试。我不明白为什么它不应该一样工作:

    from multiprocessing import Queue
    
    q = Queue()
    q.put(1)
    q.get_nowait()
    # 1
    q.close()
    q.get_nowait()
    # ...
    # IOError: handle out of range in select()
    

    你可以捕获 IOError 作为关闭信号。

    测试

    from multiprocessing import Queue
    from threading import Thread
    
    def worker(q):
        while True:
            try:
                item = q.get(timeout=.5)
            except IOError:
                print "Queue closed. Exiting thread."
                return
            except:
                continue
            print "Got item:", item
    
    q = Queue()
    for i in xrange(3):
        q.put(i)
    t = Thread(target=worker, args=(q,))
    t.start()
    # Got item: 0
    # Got item: 1
    # Got item: 2
    q.close()
    # Queue closed. Exiting thread.
    

    老实说,它与在 Queue.Queue 上设置标志并没有太大区别。 multiprocessing.Queue 只是使用关闭的文件描述符作为标志:

    from Queue import Queue
    
    def worker2(q):
        while True:
            if q.closed:
                print "Queue closed. Exiting thread."
                return
            try:
                item = q.get(timeout=.5)
            except:
                continue
            print "Got item:", item
    
    q = Queue()
    q.closed = False
    for i in xrange(3):
        q.put(i)
    t = Thread(target=worker2, args=(q,))
    t.start()
    # Got item: 0
    # Got item: 1
    # Got item: 2
    q.closed = True
    # Queue closed. Exiting thread.
    

    【讨论】:

      【解决方案3】:

      一个老问题,self._sentinel = Object() 的变体将起作用。在 2021 年重新审视这一点,我建议将 concurrent.futures 结合使用 None 作为您的哨兵:

      # Note: this is Python 3.8+ code                                                                                                                                                   
      
      import queue
      import time
      import functools
      import random
      from concurrent.futures import ThreadPoolExecutor
      
      def worker(tup):
          (q,i) = tup
          print(f"Starting thread {i}")
          partial_sum = 0
          numbers_added = 0
          while True:
              try:
                  item = q.get()
                  if item is None:
                      # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                      q.put(None)
                      break
                  numbers_added += 1
                  partial_sum += item
                  # need to pretend that we're doing something asynchronous                                                                                                              
                  time.sleep(random.random()/100)
      
          except Exception as e:
                  print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
                  break
      
          print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
          return partial_sum
      
      MAX_RANGE = 1024
      MAX_THREADS = 12
      
      with ThreadPoolExecutor() as executor:
      
          # create a queue with numbers to add up                                                                                                                                        
          (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))
      
          # kick off the threads                                                                                                                                                         
          future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])
      
          # they'll be done more or less instantly, but we'll make them wait                                                                                                             
          print("Threads launched with first batch ... sleeping 2 seconds")
          time.sleep(2)
      
          # threads are still available for more work!                                                                                                                                   
          for i in range(MAX_RANGE):
              q.put(i)
      
          print("Finished giving them another batch, this time we're not sleeping")
      
          # now we tell them all to wrap it up                                                                                                                                           
          q.put(None)
          # this will nicely catch the outputs                                                                                                                                           
          sum = functools.reduce(lambda x, y: x+y, future_partials)
          print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")
      
      # Starting thread 0                                                                                                                                                                
      # Starting thread 1                                                                                                                                                                
      # Starting thread 2                                                                                                                                                                
      # Starting thread 3                                                                                                                                                                
      # Starting thread 4                                                                                                                                                                
      # Starting thread 5                                                                                                                                                                
      # Starting thread 6                                                                                                                                                                
      # Starting thread 7                                                                                                                                                                
      # Starting thread 8                                                                                                                                                                
      # Starting thread 9                                                                                                                                                                
      # Starting thread 10                                                                                                                                                               
      # Starting thread 11                                                                                                                                                               
      # Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
      # Finished giving them another batch, this time we're not sleeping                                                                                                                 
      # Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
      # Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
      # Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
      # Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
      # Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
      # Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
      # Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
      # Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
      # Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
      # Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
      # Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
      # Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
      # Got total sum 1047552 (correct answer is 1047552      
      
                                                                                                                             
      

      注意事实上的“主线程”只需要将None 推入队列,类似于条件变量“信号”,所有线程都会拾取(并传播)它。

      此外,这不使用比标准(线程安全)队列更重的多处理器Queue。上面的代码还具有易于修改为使用ProcessPoolExecutor 或两者混合的好处(在任何一种情况下,您都需要使用multiprocessing.Queue)。

      (旁注:一般来说,如果需要类来解决任何给定 Python 一代中的“基本”问题,那么在更现代的版本中通常会有新的选项。)

      (附注:代码是 Python 3.8+ 的唯一原因是因为我是 assignment expressions 的粉丝,这与上面的附注一致,解决了如何初始化的历史问题列表中的队列,而不必求助于非功能性解决方案。)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-18
        • 2014-04-26
        • 1970-01-01
        • 1970-01-01
        • 2015-07-18
        • 2019-09-29
        相关资源
        最近更新 更多