【发布时间】:2012-01-21 22:18:50
【问题描述】:
我还没有看到Pool.apply、Pool.apply_async 和Pool.map 用例的明确示例。我主要用Pool.map;别人的优点是什么?
【问题讨论】:
标签: python multithreading concurrency multiprocessing
我还没有看到Pool.apply、Pool.apply_async 和Pool.map 用例的明确示例。我主要用Pool.map;别人的优点是什么?
【问题讨论】:
标签: python multithreading concurrency multiprocessing
在 Python 的旧时代,要调用带有任意参数的函数,您可以使用 apply:
apply(f,args,kwargs)
apply 在 Python2.7 中仍然存在,但在 Python3 中没有,一般不再使用。如今,
f(*args,**kwargs)
是首选。 multiprocessing.Pool 模块尝试提供类似的接口。
Pool.apply 类似于 Python apply,只是函数调用是在单独的进程中执行的。 Pool.apply 阻塞直到函数完成。
Pool.apply_async 也类似于 Python 内置的 apply,只是调用立即返回而不是等待结果。返回一个AsyncResult 对象。您调用它的get() 方法来检索函数调用的结果。 get() 方法阻塞,直到函数完成。因此,pool.apply(func, args, kwargs) 等价于pool.apply_async(func, args, kwargs).get()。
与Pool.apply 相比,Pool.apply_async 方法还有一个回调,如果提供了该回调,则会在函数完成时调用。这可以用来代替调用get()。
例如:
import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
可能会产生如下结果
[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
请注意,与pool.map 不同,结果的顺序可能与进行pool.apply_async 调用的顺序不一致。
因此,如果您需要在单独的进程中运行函数,但希望当前进程阻塞直到该函数返回,请使用Pool.apply。与Pool.apply 一样,Pool.map 会一直阻塞,直到返回完整的结果。
如果您希望工作进程池异步执行许多函数调用,请使用Pool.apply_async。结果的顺序不保证与Pool.apply_async的调用顺序相同。
另请注意,您可以使用Pool.apply_async 调用许多不同的函数(并非所有调用都需要使用相同的函数)。
相比之下,Pool.map 将相同的函数应用于许多参数。
但是,与Pool.apply_async 不同的是,结果的返回顺序与参数的顺序相对应。
【讨论】:
apply_async_with_callback()之前应该有if __name__=="__main__"吗?
Pool.map(func,iterable) 等价于Pool.map_async(func,iterable).get()。所以Pool.map和Pool.map_async之间的关系类似于Pool.apply和Pool.apply_async。 async 命令立即返回,而非async 命令阻塞。 async 命令也有回调。
Pool.map 和Pool.apply 之间做出决定类似于在Python 中决定何时使用map 或apply。您只需使用适合工作的工具。决定使用async 和非async 版本取决于您是否希望调用阻止当前进程和/或是否要使用回调。
apply_async 都会返回一个 ApplyResult 对象。调用ApplyResult 的get 方法将返回关联函数的返回值(如果调用超时,则引发mp.TimeoutError。)因此,如果您将ApplyResults 放在有序列表中,然后调用它们的@ 987654375@ 方法将以相同的顺序返回结果。但是,在这种情况下,您可以只使用 pool.map。
pool._result_handler线程一次处理一个返回值,将返回值传递给回调函数。所以你可以保证每个返回值都会调用一次回调函数,并且这里没有并发问题,因为回调是由主进程中的单个线程顺序调用的。
这里是一个表格格式的概述,以显示Pool.apply、Pool.apply_async、Pool.map 和Pool.map_async 之间的区别。在选择一个时,您必须考虑多参数、并发、阻塞和排序:
| Multi-args Concurrence Blocking Ordered-results
---------------------------------------------------------------------
Pool.map | no yes yes yes
Pool.map_async | no yes no yes
Pool.apply | yes no yes no
Pool.apply_async | yes yes no no
Pool.starmap | yes yes yes yes
Pool.starmap_async| yes yes no no
Pool.imap 和 Pool.imap_async – map 和 map_async 的惰性版本。
Pool.starmap 方法,除了接受多个参数之外,与 map 方法非常相似。
Async 方法一次提交所有进程,并在完成后检索结果。使用get方法获取结果。
Pool.map(或Pool.apply)方法与Python内置的map(或apply)非常相似。它们阻塞主进程,直到所有进程完成并返回结果。
一次调用一系列作业
results = pool.map(func, [1, 2, 3])
只能为一项工作调用
for x, y in [[1, 1], [2, 2]]:
results.append(pool.apply(func, (x, y)))
def collect_result(result):
results.append(result)
一次调用一系列作业
pool.map_async(func, jobs, callback=collect_result)
只能为一个作业调用,并在后台并行执行一个作业
for x, y in [[1, 1], [2, 2]]:
pool.apply_async(worker, (x, y), callback=collect_result)
是pool.map 的变体,支持多个参数
pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
starmap() 和 map_async() 的组合,它迭代可迭代的可迭代对象,并在未打包的可迭代对象的情况下调用 func。返回一个结果对象。
pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)
在此处查找完整文档:https://docs.python.org/3/library/multiprocessing.html
【讨论】:
关于apply 与map:
pool.apply(f, args):f 仅在池的一个工人中执行。所以池中的一个进程将运行f(args)。
pool.map(f, iterable):此方法将可迭代对象分割成若干块,作为单独的任务提交给进程池。因此,您可以利用池中的所有进程。
【讨论】:
apply_async() 时会发生什么?会自动排队处理吗?