【问题标题】:python multiprocessing - process hangs on join for large queuepython multiprocessing - 进程挂起加入大队列
【发布时间】:2014-03-05 16:47:12
【问题描述】:

我正在运行 python 2.7.3,我注意到以下奇怪的行为。考虑这个最小的例子:

from multiprocessing import Process, Queue

def foo(qin, qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        qout.put({'bar': bar})

if __name__ == '__main__':
    import sys

    qin = Queue()
    qout = Queue()
    worker = Process(target=foo,args=(qin,qout))
    worker.start()

    for i in range(100000):
        print i
        sys.stdout.flush()
        qin.put(i**2)

    qin.put(None)
    worker.join()

当我循环超过 10,000 个或更多时,我的脚本会挂在 worker.join() 上。当循环仅达到 1,000 时,它工作正常。

有什么想法吗?

【问题讨论】:

标签: python process queue multiprocessing


【解决方案1】:

子进程中的qout 队列已满。您从foo() 放入的数据不适合内部使用的操作系统管道的缓冲区,因此子进程会阻止尝试容纳更多数据。但是父进程并没有读取这些数据:它也只是被阻塞了,等待子进程完成。这是典型的死锁。

【讨论】:

  • 如果您还提供了该问题的代码解决方案,那就太好了。 IE。如何清空缓冲区使子进程不阻塞。
【解决方案2】:

队列的大小必须有限制。考虑以下修改:

from multiprocessing import Process, Queue

def foo(qin,qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        #qout.put({'bar':bar})

if __name__=='__main__':
    import sys

    qin=Queue()
    qout=Queue()   ## POSITION 1
    for i in range(100):
        #qout=Queue()   ## POSITION 2
        worker=Process(target=foo,args=(qin,))
        worker.start()
        for j in range(1000):
            x=i*100+j
            print x
            sys.stdout.flush()
            qin.put(x**2)

        qin.put(None)
        worker.join()

    print 'Done!'

这按原样工作(qout.put 行已注释掉)。如果您尝试保存所有 100000 个结果,则 qout 变得太大:如果我取消注释掉 foo 中的 qout.put({'bar':bar}),并在 POSITION 1 中保留 qout 的定义,则代码挂起。但是,如果我将 qout 定义移动到位置 2,则脚本完成。

简而言之,您必须小心qinqout 都不会变得太大。 (另见:Multiprocessing Queue maxsize limit is 32767

【讨论】:

    【解决方案3】:

    当我尝试将字符串放入总大小约为 5000 cahrs 的队列中时,我在 python3 上遇到了同样的问题。

    在我的项目中,有一个主机进程设置队列并启动子进程,然后加入。 Afrer join 主机进程从队列中读取。当子进程产生过多数据时,主机挂起join。我使用以下函数修复了此问题,以等待主机进程中的子进程:

    from multiprocessing import Process, Queue
    from queue import Empty
    
    def yield_from_process(q: Queue, p: Process):
        while p.is_alive():
            p.join(timeout=1)
            while True:
                try:
                    yield q.get(block=False)
                except Empty:
                    break
    

    我在队列填满后立即从队列中读取,因此它永远不会变得很大

    【讨论】:

    • 什么是Empty
    【解决方案4】:

    在池关闭后,我试图 .get() 一个异步工作者

    with 块外的缩进错误

    我有这个

    with multiprocessing.Pool() as pool:
        async_results = list()
        for job in jobs:
            async_results.append(
                pool.apply_async(
                    _worker_func,
                    (job,),
                )
            )
    # wrong
    for async_result in async_results:
        yield async_result.get()
    

    我需要这个

    with multiprocessing.Pool() as pool:
        async_results = list()
        for job in jobs:
            async_results.append(
                pool.apply_async(
                    _worker_func,
                    (job,),
                )
            )
        # right
        for async_result in async_results:
            yield async_result.get()
    

    【讨论】:

    • 你能详细说明一下吗?我想在with 块之后使用get() 的结果。
    • @MSS,退出 with 块会破坏池,这意味着所有作业结果都将被删除,因此您可以将它们复制到存在于 with 块之外的变量/列表中,或者您可以使用它们在 with 块内(或者根本不使用 with 块,完成后手动关闭池)
    猜你喜欢
    • 2017-10-19
    • 2015-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-28
    • 2018-10-09
    • 1970-01-01
    相关资源
    最近更新 更多