【问题标题】:Starting a large number of async processes with multiprocessing使用多处理启动大量异步进程
【发布时间】:2013-11-18 02:10:30
【问题描述】:

如果我调用 apply_async 10,000 次,假设 OOM-killer 不会干扰,多处理会同时启动它们,还是分批启动它们。比如..每100次启动,等90次启动完毕再启动?

达斯汀

【问题讨论】:

    标签: python multiprocessing high-volume


    【解决方案1】:

    apply_async()multiprocessing.Pool 对象的方法,并将所有工作交付给您在创建Pool 时指定的进程数。只有那么多任务可以同时运行。其余的由多处理机制保存在队列(或管道)中,并在进程完成已分配的任务时自动分配给进程。 所有您提供多个工作项的Pool 方法也是如此。

    再澄清一点:apply_async 不会创建或启动任何进程。这些进程是在您调用Pool() 时创建的。这些进程只是坐在那里等待,直到您调用要求完成一些实际工作的Pool 方法(如apply_async())。

    示例

    玩这个:

    MAX = 100000
    
    from time import sleep
    def f(i):
        sleep(0.01)
        return i
    
    def summer(summand):
        global SUM, FINISHED
        SUM += summand
        FINISHED += 1
    
    if __name__ == "__main__":
        import multiprocessing as mp
        SUM = 0
        FINISHED = 0
        pool = mp.Pool(4)
    
        print "queuing", MAX, "work descriptions"
        for i in xrange(MAX):
            pool.apply_async(f, args=(i,), callback=summer)
            if i % 1000 == 0:
                print "{}/{}".format(FINISHED, i),
        print
    
        print "closing pool"
        pool.close()
    
        print "waiting for processes to end"
        pool.join()
    
        print "verifying result"
        print "got", SUM, "expected", sum(xrange(MAX))
    

    输出如下:

    queuing 100000 work descriptions
    0/0 12/1000 21/2000 33/3000 42/4000
    ... stuff chopped for brevity ...
    1433/95000 1445/96000 1456/97000 1466/98000 1478/99000
    closing pool
    waiting for processes to end
    ... and it waits here "for a long time" ...
    verifying result
    got 4999950000 expected 4999950000
    

    您可以通过观察它的行为来回答大部分问题。工作项快速排队。当我们看到“关闭池”时,所有工作项都已排队,但 1478 个已经完成,大约 98000 个仍在等待某个进程处理它们。

    如果您从f() 中取出sleep(0.01),则它的揭示性要小得多,因为结果返回的速度几乎与工作项排队的速度一样快。

    不过,无论您如何运行它,内存使用仍然是微不足道的。这里的工作项(函数的名称 ("f") 及其腌制整数参数)很小。

    【讨论】:

    • 更具体地说,我问的是,如果我调用 apply_async N 次(其中 N 很大),是否会尽快分配工作或以某种方式限制工作?
    • 我想我已经回答了 ;-) 有什么不清楚的地方?将尽可能快地分配工作,并且它受到进程计算结果的速度的限制。不然怎么可能?如果您有P 进程,则第一个P 异步调用将尽快开始。在第一个 P 异步调用完成之前,没有其他人会开始。然后剩下的一个N async 调用将被分配给完成其第一个任务的进程。等等等等。我认为你的心智模型可能方式过于复杂。这真的很简单:-)
    • 我只是在询问起始方面。您提到您相信池“将所有工作交付给您在创建池时指定的进程数量”,但仍然留有限制空间。
    • 对不起,我想我真的不知道您所说的“开始”或“节流”是什么意思 ;-) N 函数的名称,以及它们的泡菜参数(如果有的话)立即放入队列(或馈送到管道)。这都是由主程序完成的。因此,所有N一些 工作立即完成 - 但只有一小部分,都与实现开销有关。 P 进程从队列/管道中提取该信息,每个进程一次处理一项任务,并一次完成一项真正的工作。这是“节流”吗?不是对我,但也许对你。
    • @DustinOprea,请参阅我刚才的编辑以获取您可以使用的代码。
    猜你喜欢
    • 1970-01-01
    • 2015-05-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-03-30
    • 1970-01-01
    • 1970-01-01
    • 2018-06-05
    相关资源
    最近更新 更多