【问题标题】:How to get chunks of elements from a queue?如何从队列中获取大量元素?
【发布时间】:2015-10-02 07:15:15
【问题描述】:

我有一个queue,我需要从中获取 10 个条目的块并将它们放入一个列表中,然后对其进行进一步处理。下面的代码有效(在示例中,“进一步处理”只是将列表打印出来)。

import multiprocessing

# this is an example of the actual queue
q = multiprocessing.Queue()
for i in range(22):
    q.put(i)
q.put("END")

counter = 0
mylist = list()
while True:
    v = q.get()
    if v == "END":
        # outputs the incomplete (< 10 elements) list
        print(mylist)
        break
    else:
        mylist.append(v)
        counter += 1
        if counter % 10 == 0:
            print(mylist)
            # empty the list
            mylist = list()

# this outputs
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# [20, 21]

这段代码很难看。我不知道如何改进它 - 我前段时间阅读了how to use iter with a sentinel,但没有看到我的问题如何利用它。

有没有更好(= 更优雅/pythonic)的方法来解决这个问题?

【问题讨论】:

    标签: python sentinel


    【解决方案1】:

    在 Python 3.8 中,您可以使用海象运算符。

    import threading
    from itertools import islice
    from queue import Queue
    
    def producer(queue, _end):
        for i in range(10, 100):
            queue.put(i)
        else:
            queue.put(_end)
    
    def consumer(queue, _end):
        iterator = iter(queue.get, _end)
        # Consumes the queue in batches
        while batch := list(islice(iterator, 10)):
            print(batch)
    
    def main():
    
        queue = Queue()
        _end = object()
    
        t1 = threading.Thread(target=consumer, args=(queue, _end))
        t2 = threading.Thread(target=producer, args=(queue, _end))
    
        t1.start()
        t2.start()
    
        t2.join()
        t1.join()
    
    if __name__ == '__main__':
        main()
    
    In [2]: main()                                                                  
    [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
    [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
    [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
    [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
    [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
    

    【讨论】:

      【解决方案2】:

      您可以使用iter 两次:iter(q.get, 'END') 返回一个迭代器,该迭代器可以迭代队列中的值,直到'END'q.get() 返回。

      那么你可以使用the grouper recipe

      iter(lambda: list(IT.islice(iterator, 10)), [])
      

      将迭代器分组为 10 个项目的块。

      import itertools as IT
      import multiprocessing as mp
      
      q = mp.Queue()
      for i in range(22):
          q.put(i)
      q.put("END")
      
      iterator = iter(q.get, 'END')
      for chunk in iter(lambda: list(IT.islice(iterator, 10)), []):
          print(chunk)
      

      产量

      [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
      [20, 21]
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-07-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-06-10
        相关资源
        最近更新 更多