【发布时间】:2017-09-24 14:38:25
【问题描述】:
我在 python 中有一个令人尴尬的并行 for 循环(重复 n 次),每次迭代执行一个复杂的任务并返回 numpy 数组和 dict 的混合(所以不是一个数字填充到数组中 - 否则现在将它们视为复杂的一堆)。重复不需要以任何特定顺序 - 我只需要能够唯一地识别n 迭代中的每个i(例如,在重复中独立保存结果)。事实上,它们甚至不需要通过索引/计数器来识别,而是一种独特的东西,因为它们不需要被排序(我可以轻松地将它们填充回更大的数组中。)
举一个更具体的例子,我想并行化以下任务:
def do_complex_task(complex_input1, input2, input3, input_n):
"all important computation done here - independent of i or n"
inner_result1, inner_result2 = np.zeros(100), np.zeros(100)
for smaller_input in complex_input1:
inner_result1 = do_another_complex_task(smaller_input, input2, input3, input_n)
inner_result2 = do_second_complex_task(smaller_input, input2, input3, input_n)
# do some more to produce few more essential results
dict_result = blah()
unique_identifier = get_unique_identifier_for_this_thread() # I don't know how
# save results for each repetition independently before returning,
# instead of waiting for full computation to be done which can take a while
out_path = os.path.join(out_dir, 'repetition_{}.pkl'.format(unique_identifier))
return inner_result1, inner_result2, inner_result_n, dict_result
def main_compute()
"main method to run the loop"
n = 256 # ideally any number, but multiples of 4 possible, for even parallelization.
result1 = np.zeros([n, 100])
result2 = np.zeros([n, 100])
result_n = np.zeros([n, 100])
dict_result = list()
# this for loop does not need to be computed in any order (range(n) is an illustration)
# although this order would be ideal, as it makes it easy to populate results into a bigger array
for i in range(n):
# this computation has nothing to do with i or n!
result1[i, :], result2[i, :], result_n[i, :], dict_result[i] = do_complex_task(complex_input1, input2, input3, input_n)
# I need to parallelize the above loop to speed up stupidly parallel processing.
if __name__ == '__main__':
pass
我的阅读范围相当广泛,但不清楚哪种策略更聪明、更简单,而且没有任何可靠性问题。
complex_input1 也可以很大 - 所以我不希望在酸洗时产生大量 I/O 开销。
我当然可以返回一个列表(包含所有复杂部分),该列表会附加到主列表中,以后可以将其组装成我喜欢的格式(矩形数组等)。例如,这可以通过joblib 轻松完成。不过,我正在努力向大家学习以找出好的解决方案。
编辑:我想我正在解决以下解决方案。让我知道它可能出了什么问题,或者我如何在速度、无副作用等方面进一步改进它。在我的笔记本电脑上进行了几次非结构化试验后,不清楚是否有明显的加速因为这个。
from multiprocessing import Pool, Manager
chunk_size = int(np.ceil(num_repetitions/num_procs))
with Manager() as proxy_manager:
shared_inputs = proxy_manager.list([complex_input1, input2, another, blah])
partial_func_holdout = partial(key_func_doing_work, *shared_inputs)
with Pool(processes=num_procs) as pool:
results = pool.map(partial_func_holdout, range(num_repetitions), chunk_size)
【问题讨论】:
-
重读您的问题,您似乎不想使用数组(列表),因为您的结果类型很复杂。别担心。无论如何,将它们放在一个列表中。结果类型有多复杂并不重要,只要它是单个对象即可。但是,如果您清楚地定义结果类会更容易,因此更清楚的是它是一个单一的结果(有很多部分)。
-
谢谢肯尼。我同意你的观点“结果类型有多复杂并不重要,只要它是一个对象。”这当然是我不介意实施的一个选项 - 只是想看看其他 python 专家和用户可以提出什么建议 - 我想不出完全不同的解决方案。
-
另外,我宁愿不定义一个新的
class只是为了将几个数组放在一起。让我们看看其他人的建议。
标签: python numpy parallel-processing