【问题标题】:Knowing when you've read everything off a multiprocessing Queue知道何时从多处理队列中读取所有内容
【发布时间】:2023-02-10 15:14:04
【问题描述】:

我有一些代码可以将工作分配给任务。任务将它们的结果放在队列中,主线程从队列中读取这些结果并进行处理。

from multiprocessing import Process, Queue, Pool, Manager
import uuid


def handle_task(arg, queue, end_marker):
    ... add some number of results to the queue . . .
    queue.put(end_marker)

def main(tasks):
    manager = Manager()
    queue = manager.Queue()
    count = len(tasks)
    end_marker = uuid.uuid4()
    with Pool() as pool:
        pool.starmap(handle_task, ((task, queue, end_marker) for task in tasks))
        while count > 0:
            value = queue.get()
            if value == end_marker:
                count -= 1
            else:
                ... deal with value ...

这段代码有效,但它非常笨拙和不优雅。如果tasks 是一个迭代器呢?为什么我需要提前知道有多少任务并跟踪每个任务。

是否有一种更简洁的方法来读取队列并知道将写入该线程的每个进程都已完成,并且您已经阅读了他们编写的所有内容?

【问题讨论】:

    标签: python python-3.x multiprocessing queue


    【解决方案1】:

    将对队列的访问移到池的工作管理器之外。如果你这样做,那么你就知道,根据定义,子流程已经全部终止。这也意味着不需要任何类型的结束标记。

    这是此模式的人为示例:

    from multiprocessing import Pool, Manager
    from random import randint
    
    def process(n, q):
        for x in range(randint(1, 10)):
            q.put((n, x))
    
    def main():
        with Manager() as manager:
            queue = manager.Queue()
            with Pool() as pool:
                pool.starmap(process, [(n, queue) for n in range(5)])
            while not queue.empty():
                print(queue.get())
    
    if __name__ == '__main__':
        main()
    

    示例输出:

    (0, 0)
    (1, 0)
    (0, 1)
    (2, 0)
    (0, 2)
    (0, 3)
    (2, 1)
    (3, 0)
    (0, 4)
    (2, 2)
    (3, 1)
    (3, 2)
    (4, 0)
    (4, 1)
    (4, 2)
    (4, 3)
    (4, 4)
    (4, 5)
    (4, 6)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-12-03
      • 2012-01-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多