【发布时间】:2019-04-02 05:24:33
【问题描述】:
我正在对模拟的时间序列进行分析。基本上,它对每个时间步执行相同的任务。由于时间步数非常多,并且每个时间步的分析都是独立的,我想创建一个可以多处理另一个函数的函数。后者将有参数,并返回一个结果。
使用共享字典和 lib concurrent.futures,我设法写了这个:
import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# function : function that is running in parallel
# param_list : list of items
# group_size : size of the groups
# Nworkers : number of group/items running in the same time
# **param_fixed : passing parameters
manager = mlp.Manager()
dic = manager.dict()
executor = Cfut.ProcessPoolExecutor(Nworkers)
futures = [executor.submit(function, param, dic, *args)
for param in grouper(param_list, group_size)]
Cfut.wait(futures)
return [dic[i] for i in sorted(dic.keys())]
通常,我可以这样使用它:
def read_file(files, dictionnary):
for file in files:
i = int(file[4:9])
#print(str(i))
if 'bz2' in file:
os.system('bunzip2 ' + file)
file = file[:-4]
dictionnary[i] = np.loadtxt(file)
os.system('bzip2 ' + file)
Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))
或者像这样:
def autocorr(x):
result = np.correlate(x, x, mode='full')
return result[result.size//2:]
def find_lambda_finger(indexes, dic, Deviation):
for i in indexes :
#print(str(i))
# Beach = Deviation[i,:] - np.mean(Deviation[i,:])
dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)
args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)
基本上,它正在工作。但它运作不佳。有时它会崩溃。有时它实际上会启动多个等于 Nworkers 的 python 进程,有时在我指定 Nworkers = 15 时一次只运行 2 或 3 个。
例如,我获得的一个经典错误在我提出的以下主题中进行了描述:Calling matplotlib AFTER multiprocessing sometimes results in error : main thread not in main loop
实现我想要的更 Pythonic 的方式是什么?我怎样才能改进控制这个功能?如何控制更多运行python进程的数量?
【问题讨论】:
-
你要多进程的功能总是一样吗?或者您是否需要原型
multiprocess_loop_grouped来处理任意函数?流程需要哪些数据?只是时间步长?还是其他东西? -
正如您在两个使用示例中看到的那样,我已经把主要信息放在了结尾,我希望它可以与任意函数一起使用,因为我在分析数据。通常,我必须对 1000 组数据运行第 2 步,然后对 1000 组数据运行第 3 步,然后对第 2 步和第 3 步的 1000 个输出运行第 4 步。数据存储在一个 numpy 数组中可以循环/多进程。例如,我给出的第二个示例中的“偏差”。
标签: python python-3.x multiprocessing shared-memory concurrent.futures