并行化此代码的最简单方法是什么?
使用来自concurrent.futures 的 PoolExecutor。将原始代码与此并排比较。首先,最简洁的方法是使用executor.map:
...
with ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(calc_stuff, parameters):
...
或通过单独提交每个调用来分解:
...
with ThreadPoolExecutor() as executor:
futures = []
for parameter in parameters:
futures.append(executor.submit(calc_stuff, parameter))
for future in futures:
out1, out2, out3 = future.result() # this will block
...
离开上下文向执行者发出释放资源的信号
您可以使用线程或进程并使用完全相同的接口。
一个工作示例
这里是工作示例代码,它将展示 :
把它放在一个文件中——futuretest.py:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection
def processor_intensive(arg):
def fib(n): # recursive, processor intensive calculation (avoid n > 36)
return fib(n-1) + fib(n-2) if n > 1 else n
start = time()
result = fib(arg)
return time() - start, result
def io_bound(arg):
start = time()
con = HTTPSConnection(arg)
con.request('GET', '/')
result = con.getresponse().getcode()
return time() - start, result
def manager(PoolExecutor, calc_stuff):
if calc_stuff is io_bound:
inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
'noaa.gov', 'parler.com', 'aaronhall.dev')
else:
inputs = range(25, 32)
timings, results = list(), list()
start = time()
with PoolExecutor() as executor:
for timing, result in executor.map(calc_stuff, inputs):
# put results into correct output list:
timings.append(timing), results.append(result)
finish = time()
print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
print(f'wall time to execute: {finish-start}')
print(f'total of timings for each call: {sum(timings)}')
print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
print(dict(zip(inputs, results)), end = '\n\n')
def main():
for computation in (processor_intensive, io_bound):
for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
manager(pool_executor, calc_stuff=computation)
if __name__ == '__main__':
main()
这是python -m futuretest 的一次运行的输出:
processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
处理器密集型分析
在 Python 中执行处理器密集型计算时,预计 ProcessPoolExecutor 比 ThreadPoolExecutor 的性能更高。
由于 Global Interpreter Lock(又名 GIL),线程不能使用多个处理器,因此预计每次计算的时间和 wall time(经过的实时)会更长。
IO 绑定分析
另一方面,在执行 IO 绑定操作时,期望 ThreadPoolExecutor 比 ProcessPoolExecutor 性能更高。
Python 的线程是真实的,操作系统,线程。操作系统可以让它们进入睡眠状态,并在它们的信息到达时重新唤醒它们。
最后的想法
我怀疑多处理在 Windows 上会更慢,因为 Windows 不支持分叉,所以每个新进程都需要时间来启动。
您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来拆分多个进程。
如果在 Python 中遇到繁重的处理问题,您可以通过额外的进程轻松扩展 - 但使用线程就不行了。