【问题标题】:multiprocessing.Pool: When to use apply, apply_async or map?multiprocessing.Pool:何时使用 apply、apply_async 或 map?
【发布时间】:2012-01-21 22:18:50
【问题描述】:

我还没有看到Pool.applyPool.apply_asyncPool.map 用例的明确示例。我主要用Pool.map;别人的优点是什么?

【问题讨论】:

    标签: python multithreading concurrency multiprocessing


    【解决方案1】:

    在 Python 的旧时代,要调用带有任意参数的函数,您可以使用 apply

    apply(f,args,kwargs)
    

    apply 在 Python2.7 中仍然存在,但在 Python3 中没有,一般不再使用。如今,

    f(*args,**kwargs)
    

    是首选。 multiprocessing.Pool 模块尝试提供类似的接口。

    Pool.apply 类似于 Python apply,只是函数调用是在单独的进程中执行的。 Pool.apply 阻塞直到函数完成。

    Pool.apply_async 也类似于 Python 内置的 apply,只是调用立即返回而不是等待结果。返回一个AsyncResult 对象。您调用它的get() 方法来检索函数调用的结果。 get() 方法阻塞,直到函数完成。因此,pool.apply(func, args, kwargs) 等价于pool.apply_async(func, args, kwargs).get()

    Pool.apply 相比,Pool.apply_async 方法还有一个回调,如果提供了该回调,则会在函数完成时调用。这可以用来代替调用get()

    例如:

    import multiprocessing as mp
    import time
    
    def foo_pool(x):
        time.sleep(2)
        return x*x
    
    result_list = []
    def log_result(result):
        # This is called whenever foo_pool(i) returns a result.
        # result_list is modified only by the main process, not the pool workers.
        result_list.append(result)
    
    def apply_async_with_callback():
        pool = mp.Pool()
        for i in range(10):
            pool.apply_async(foo_pool, args = (i, ), callback = log_result)
        pool.close()
        pool.join()
        print(result_list)
    
    if __name__ == '__main__':
        apply_async_with_callback()
    

    可能会产生如下结果

    [1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
    

    请注意,与pool.map 不同,结果的顺序可能与进行pool.apply_async 调用的顺序不一致。


    因此,如果您需要在单独的进程中运行函数,但希望当前进程阻塞直到该函数返回,请使用Pool.apply。与Pool.apply 一样,Pool.map 会一直阻塞,直到返回完整的结果。

    如果您希望工作进程池异步执行许多函数调用,请使用Pool.apply_async。结果的顺序不保证与Pool.apply_async的调用顺序相同。

    另请注意,您可以使用Pool.apply_async 调用许多不同的函数(并非所有调用都需要使用相同的函数)。

    相比之下,Pool.map 将相同的函数应用于许多参数。 但是,与Pool.apply_async 不同的是,结果的返回顺序与参数的顺序相对应。

    【讨论】:

    • 在Windows上apply_async_with_callback()之前应该有if __name__=="__main__"吗?
    • 查看multiprocessing/pool.py 内部,您会看到Pool.map(func,iterable) 等价于Pool.map_async(func,iterable).get()。所以Pool.mapPool.map_async之间的关系类似于Pool.applyPool.apply_asyncasync 命令立即返回,而非async 命令阻塞。 async 命令也有回调。
    • 在使用Pool.mapPool.apply 之间做出决定类似于在Python 中决定何时使用mapapply。您只需使用适合工作的工具。决定使用async 和非async 版本取决于您是否希望调用阻止当前进程和/或是否要使用回调。
    • @falsePockets:是的。每次调用 apply_async 都会返回一个 ApplyResult 对象。调用ApplyResultget 方法将返回关联函数的返回值(如果调用超时,则引发mp.TimeoutError。)因此,如果您将ApplyResults 放在有序列表中,然后调用它们的@ 987654375@ 方法将以相同的顺序返回结果。但是,在这种情况下,您可以只使用 pool.map
    • @galactica:每次worker函数成功结束(不引发异常),在主进程中调用回调函数。工作函数将返回值放入队列中,主进程中的pool._result_handler线程一次处理一个返回值,将返回值传递给回调函数。所以你可以保证每个返回值都会调用一次回调函数,并且这里没有并发问题,因为回调是由主进程中的单个线程顺序调用的。
    【解决方案2】:

    这里是一个表格格式的概述,以显示Pool.applyPool.apply_asyncPool.mapPool.map_async 之间的区别。在选择一个时,您必须考虑多参数、并发、阻塞和排序:

                      | Multi-args   Concurrence    Blocking     Ordered-results
    ---------------------------------------------------------------------
    Pool.map          | no           yes            yes          yes
    Pool.map_async    | no           yes            no           yes
    Pool.apply        | yes          no             yes          no
    Pool.apply_async  | yes          yes            no           no
    Pool.starmap      | yes          yes            yes          yes
    Pool.starmap_async| yes          yes            no           no
    

    注意事项:

    • Pool.imapPool.imap_async – map 和 map_async 的惰性版本。

    • Pool.starmap 方法,除了接受多个参数之外,与 map 方法非常相似。

    • Async 方法一次提交所有进程,并在完成后检索结果。使用get方法获取结果。

    • Pool.map(或Pool.apply)方法与Python内置的map(或apply)非常相似。它们阻塞主进程,直到所有进程完成并返回结果。

    示例:

    地图

    一次调用一系列作业

    results = pool.map(func, [1, 2, 3])
    

    申请

    只能为一项工作调用

    for x, y in [[1, 1], [2, 2]]:
        results.append(pool.apply(func, (x, y)))
    
    def collect_result(result):
        results.append(result)
    

    map_async

    一次调用一系列作业

    pool.map_async(func, jobs, callback=collect_result)
    

    apply_async

    只能为一个作业调用,并在后台并行执行一个作业

    for x, y in [[1, 1], [2, 2]]:
        pool.apply_async(worker, (x, y), callback=collect_result)
    

    星图

    pool.map 的变体,支持多个参数

    pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
    

    starmap_async

    starmap() 和 map_async() 的组合,它迭代可迭代的可迭代对象,并在未打包的可迭代对象的情况下调用 func。返回一个结果对象。

    pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)
    

    参考:

    在此处查找完整文档:https://docs.python.org/3/library/multiprocessing.html

    【讨论】:

    • Pool.starmap() 被阻塞
    • 我喜欢这个答案,+1
    • 如果 apply 没有并发,那它的意义何在?使用?
    【解决方案3】:

    关于applymap

    pool.apply(f, args)f 仅在池的一个工人中执行。所以池中的一个进程将运行f(args)

    pool.map(f, iterable):此方法将可迭代对象分割成若干块,作为单独的任务提交给进程池。因此,您可以利用池中的所有进程。

    【讨论】:

    • 如果iterable是一个生成器会怎样
    • 嗯...好问题。老实说,我从未使用过带生成器的池,但这个线程可能会有所帮助:stackoverflow.com/questions/5318936/…
    • @kakhkAtion 关于申请,如果只有一个工人执行该功能,其余工人做什么?我是否必须多次调用 apply 才能让其他工作人员执行任务?
    • 是的。如果您想以异步方式为工作人员提供午餐,还请查看 pool.apply_async 。 “pool_apply 阻塞直到结果准备好,所以 apply_async() 更适合并行执行工作”
    • 当我有 4 个进程但调用了 8 次 apply_async() 时会发生什么?会自动排队处理吗?
    猜你喜欢
    • 2014-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多