【问题标题】:Embarrassingly parallel for loop with complex outputs in each iteration在每次迭代中具有复杂输出的令人尴尬的并行 for 循环
【发布时间】:2017-09-24 14:38:25
【问题描述】:

我在 python 中有一个令人尴尬的并行 for 循环(重复 n 次),每次迭代执行一个复杂的任务并返回 numpy 数组和 dict 的混合(所以不是一个数字填充到数组中 - 否则现在将它们视为复杂的一堆)。重复不需要以任何特定顺序 - 我只需要能够唯一地识别n 迭代中的每个i(例如,在重复中独立保存结果)。事实上,它们甚至不需要通过索引/计数器来识别,而是一种独特的东西,因为它们不需要被排序(我可以轻松地将它们填充回更大的数组中。)

举一个更具体的例子,我想并行化以下任务:

def do_complex_task(complex_input1, input2, input3, input_n):
  "all important computation done here - independent of i or n"

  inner_result1, inner_result2 = np.zeros(100), np.zeros(100)
  for smaller_input in complex_input1:
    inner_result1 = do_another_complex_task(smaller_input, input2, input3, input_n)
    inner_result2 = do_second_complex_task(smaller_input, input2, input3, input_n)

  # do some more to produce few more essential results
  dict_result = blah()

  unique_identifier = get_unique_identifier_for_this_thread() # I don't know how

  # save results for each repetition independently before returning, 
  # instead of waiting for full computation to be done which can take a while
  out_path = os.path.join(out_dir, 'repetition_{}.pkl'.format(unique_identifier))

  return inner_result1, inner_result2, inner_result_n, dict_result


def main_compute()
  "main method to run the loop"

  n = 256 # ideally any number, but multiples of 4 possible, for even parallelization.

  result1  = np.zeros([n, 100])
  result2  = np.zeros([n, 100])
  result_n = np.zeros([n, 100])
  dict_result = list()

  # this for loop does not need to be computed in any order (range(n) is an illustration)
  # although this order would be ideal, as it makes it easy to populate results into a bigger array
  for i in range(n):
    # this computation has nothing to do with i or n!
    result1[i, :], result2[i, :], result_n[i, :], dict_result[i] = do_complex_task(complex_input1, input2, input3, input_n)

  # I need to parallelize the above loop to speed up stupidly parallel processing.


if __name__ == '__main__':
    pass

我的阅读范围相当广泛,但不清楚哪种策略更聪明、更简单,而且没有任何可靠性问题。

complex_input1 也可以很大 - 所以我不希望在酸洗时产生大量 I/O 开销。

我当然可以返回一个列表(包含所有复杂部分),该列表会附加到主列表中,以后可以将其组装成我喜欢的格式(矩形数组等)。例如,这可以通过joblib 轻松完成。不过,我正在努力向大家学习以找出好的解决方案。

编辑:我想我正在解决以下解决方案。让我知道它可能出了什么问题,或者我如何在速度、无副作用等方面进一步改进它。在我的笔记本电脑上进行了几次非结构化试验后,不清楚是否有明显的加速因为这个。

from multiprocessing import Pool, Manager
chunk_size = int(np.ceil(num_repetitions/num_procs))
with Manager() as proxy_manager:
    shared_inputs = proxy_manager.list([complex_input1, input2, another, blah])
    partial_func_holdout = partial(key_func_doing_work, *shared_inputs)

    with Pool(processes=num_procs) as pool:
        results = pool.map(partial_func_holdout, range(num_repetitions), chunk_size)

【问题讨论】:

  • 重读您的问题,您似乎不想使用数组(列表),因为您的结果类型很复杂。别担心。无论如何,将它们放在一个列表中。结果类型有多复杂并不重要,只要它是单个对象即可。但是,如果您清楚地定义结果类会更容易,因此更清楚的是它是一个单一的结果(有很多部分)。
  • 谢谢肯尼。我同意你的观点“结果类型有多复杂并不重要,只要它是一个对象。”这当然是我不介意实施的一个选项 - 只是想看看其他 python 专家和用户可以提出什么建议 - 我想不出完全不同的解决方案。
  • 另外,我宁愿不定义一个新的class 只是为了将几个数组放在一起。让我们看看其他人的建议。

标签: python numpy parallel-processing


【解决方案1】:

multiprocessing.Pool.map 的形式有一个内置的解决方案

import multiprocessing
from functools import partial

def do_task(a, b):
    return (42, {'x': a * 2, 'y': b[::-1]})

if __name__ == '__main__':
    a_values = ['Hello', 'World']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(do_task, b='fixed b value'), a_values)
    print(results)

此后,results 将包含与a_values 相同顺序的结果。

要求是参数和返回值是 Pickle'able。除此之外,它们可能很复杂,但如果数据量很大,可能会降低一些性能。

我不知道这是否是您认为的好解决方案;我已经用过很多次了,效果很好。

您可以将返回值放在一个类中,但我个人认为这并没有真正带来好处,因为 Python 没有静态类型检查。

它只会并行启动 #processes 作业。它们应该是独立的,顺序无关紧要(我认为它们是按提供的顺序开始的,但它们可能以另一个顺序完成)。

基于this answer的示例。

【讨论】:

  • 谢谢你,马克,感谢你帮助我的努力。能否评论一下names 是什么以及为什么没有使用a_values
  • 另外,由于顺序对我来说并不重要,还有没有办法提高性能,因为并行任务可以随机启动并独立处理。
  • 对不起,我忘记改名字了,现在修好了。这假设 3 个作业可以以任意顺序独立并行运行。除非您知道作业需要多长时间并进行一些高级调度,否则这已经充分利用了并行性(当然,如果您有更多内核或任务不占用 100% cpu,您可以增加进程数)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-03-29
  • 2022-01-13
  • 2013-04-29
  • 1970-01-01
  • 1970-01-01
  • 2017-02-22
相关资源
最近更新 更多