【发布时间】:2020-09-06 09:31:14
【问题描述】:
我可以提交批次的concurrent.futures.ProcessPoolExecutor.submits(),其中每批次可能包含多个submit()。但是,我注意到如果每批提交都消耗大量 RAM,则可能会出现相当多的 RAM 使用效率低下;需要等待批次中的所有期货都完成后,才能提交另一批submit()。
如何创建 Python 的concurrent.futures.ProcessPoolExecutor.submit() 的连续流,直到满足某些条件?
测试脚本:
#!/usr/bin/env python3
import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count
def dojob( process, iterations, samples, rg ):
# Do some tasks
result = []
for i in range( iterations ):
a = rg.standard_normal( samples )
b = rg.integers( -3, 3, samples )
mean = np.mean( a + b )
result.append( ( i, mean ) )
return { process : result }
if __name__ == '__main__':
cpus = 2
iterations = 10000
samples = 1000
# Setup NumPy Random Generator
ss = SeedSequence( 1234567890 )
child_seeds = ss.spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
# Peform concurrent analysis by batches
counter = count( start=0, step=1 )
# Serial Run of dojob
process = next( counter )
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
rdict = dojob( process, iterations, samples, rg )
print( 'rdict', rdict )
# Concurrent Run of dojob
futures = []
results = []
with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:
while True:
for cpu in range( cpus ):
process = next( counter )
rg = rg_streams[ cpu ]
futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
for future in cf.as_completed( futures ):
# Do some post processing
r = future.result()
for k, v in r.items():
if len( results ) < 5000:
results.append( np.std( v ) )
print( k, len(results) )
if len(results) <= 100: #Put a huge number to simulate continuous streaming
futures = []
child_seeds = child_seeds[0].spawn( cpus )
rg_streams = [ default_rng(s) for s in child_seeds ]
else:
break
print( '\n*** Concurrent Analyses Ended ***' )
【问题讨论】:
-
请编辑一个在您的问题中显示此问题的最小示例 - 它应该是 minimal reproducible example,因此我应该能够将其粘贴到文件中并运行它以查看您所看到的问题。
-
@barny 感谢您的协助。我添加了一个测试脚本来广泛展示批处理方法。
-
仅仅等待 任何 个期货完成难道不够吗(例如使用
fut.add_done_callback()),一旦发生这种情况,看看是否有“空间”来催生新工作?您可以使用threading.Event来有效地等待该结果。