【问题标题】:How to use multiprocessing for accelerate the following function?如何使用多处理来加速以下功能?
【发布时间】:2019-10-03 01:53:01
【问题描述】:

我有以下 for 循环:

for j in range(len(a_nested_list_of_ints)):
    arr_1_, arr_2_, arr_3_ = foo(a_nested_list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

a_nested_list_of_ints 是一个嵌套的整数列表。然而,它需要很多时间才能完成。如何通过多处理对其进行优化?到目前为止,我尝试使用multiprocessing

p = Pool(5)
for j in range(len(a_nested_list_of_ints)):
    arr_1_, arr_2_, arr_3_ = p.map(foo,a_nested_list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

但是,我得到:

ValueError: not enough values to unpack (expected 3, got 2)

这里:

    arr_1_, arr_2_, arr_3_ = p.map(foo,a_nested_list_of_ints[j])

知道如何使上述操作更快吗?我什至也尝试过使用星图,但它不起作用。

【问题讨论】:

  • 最简单的答案是改进foo 的任何内容,但是由于您没有包含它,所以真的没办法说。在跳转到多处理之前优化函数可能是一个更好的主意。
  • 尝试x = p.map(...) 并检查返回的x。显然,这不是您认为应该的 3 个对象。 arr1,arr2,arr3=... 解包非常无情,并没有提供太多调试帮助。

标签: python python-3.x numpy python-multiprocessing


【解决方案1】:

这是我经常使用的多处理实现。它会将您的列表(在本例中为 a_nested_list_of_ints)拆分为您拥有的多个内核。然后它在每个拆分列表上运行您的 foo 函数,每个核心一个列表。

def split_list(all_params, instances):
    return list(np.array_split(all_params, instances))

# split the list up into equal chucks for each core
n_proc = multiprocessing.cpu_count()
split_items = split_list(to_calc, n_proc)

# create the multiprocessing pool
pool = Pool(processes=n_proc)
all_calcs = []
for i in range(n_proc):
    # the arguments to the foo definition have to be a tuple - (split[i],)
    async_calc = pool.apply_async(foo, (split_items[i],))
    all_calcs.append(async_calc)

pool.close()
pool.join()

# get results
all_results = []
for result in all_calcs:
    all_results += result.get()

print(all_results)

【讨论】:

  • 在我的情况下 to_calca_nested_list_of_ints[j]?
  • 如果您有兴趣,我更新了问题并添加了赏金
  • 当然。如果我正确理解你的问题,你想用三个 numpy 数组完成吗?多处理完成后将数据转换成这种格式有什么问题吗?我想如果你修改你的foo 函数会容易得多。
  • 好的。 hpaulj 建议像这样使用 zip 有什么问题:arr_1_, arr_2_, arr_3_ = zip(*p.map(foo,a_nested_list_of_ints))
  • 他没有考虑代码的下一部分,其中数组被分配了索引
【解决方案2】:

这是一个有效的pool 演示:

In [11]: def foo(i): 
    ...:     return np.arange(i), np.arange(10-i) 
    ...:                                                                        
In [12]: with multiprocessing.Pool(processes=2) as pool: 
    ...:     x = pool.map(foo, range(10)) 
    ...:                                                                        
In [13]: x                                                                      
Out[13]: 
[(array([], dtype=int64), array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 (array([0]), array([0, 1, 2, 3, 4, 5, 6, 7, 8])),
 (array([0, 1]), array([0, 1, 2, 3, 4, 5, 6, 7])),
 (array([0, 1, 2]), array([0, 1, 2, 3, 4, 5, 6])),
 (array([0, 1, 2, 3]), array([0, 1, 2, 3, 4, 5])),
 (array([0, 1, 2, 3, 4]), array([0, 1, 2, 3, 4])),
 (array([0, 1, 2, 3, 4, 5]), array([0, 1, 2, 3])),
 (array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2])),
 (array([0, 1, 2, 3, 4, 5, 6, 7]), array([0, 1])),
 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([0]))]

pool.map 正在进行迭代,而不是一些外部 for 循环。

为了更接近你的例子:

In [14]: def foo(alist): 
    ...:     return np.arange(*alist), np.zeros(alist,int) 
    ...:      
    ...:                                                                        
In [15]: alists=[(0,3),(1,4),(1,6,2)]                                           
In [16]: with multiprocessing.Pool(processes=2) as pool: 
    ...:     x = pool.map(foo, alists) 
    ...:                                                                        
In [17]: x                                                                      
Out[17]: 
[(array([0, 1, 2]), array([], shape=(0, 3), dtype=int64)),
 (array([1, 2, 3]), array([[0, 0, 0, 0]])),
 (array([1, 3, 5]), array([[[0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0]]]))]

请注意,pool.map 返回一个列表,其中包含从alists 生成的所有案例。解压 x 是没有意义的。

 x,y = pool.map(...)   # too many values to pack error

我可以使用zip* 成语解压x

In [21]: list(zip(*x))                                                          
Out[21]: 
[(array([0, 1, 2]), array([1, 2, 3]), array([1, 3, 5])),
 (array([], shape=(0, 3), dtype=int64), array([[0, 0, 0, 0]]), array([[[0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0]]]))]

这是 2 个元组的列表;实际上是转置的列表版本。这可以解压:

In [23]: y,z = zip(*x)                                                          
In [24]: y                                                                      
Out[24]: (array([0, 1, 2]), array([1, 2, 3]), array([1, 3, 5]))
In [25]: z                                                                      
Out[25]: 
(array([], shape=(0, 3), dtype=int64), array([[0, 0, 0, 0]]), array([[[0, 0],
         [0, 0],
         [0, 0],
         [0, 0],
         [0, 0],
         [0, 0]]]))

【讨论】:

  • 我试过了:x = foo(a_nested_list_of_ints[i]),我得到了这个:Columns 0 to 9 0.1590 -0.1165 -0.1300 0.1441 -0.3016 0.2087 0.2543 -0.0794 -0.1356 0.1614,类型是(Variable containing:....,我应该如何继续?
  • 我想你在另一个问题中说过foo 是一个torch 函数,它可能返回某种torch 数组,您尝试将其转换为ndarrayarr.data.numpy() .我没有torch或相关包,所以不能直接帮忙。
  • 索引分配会发生什么? arr_2[j,:]
  • 您可能需要在解压缩列表上执行单独的j 循环。我对torchdata.numpy() 转换了解不多,无法知道是否有一些连接或其他操作可以对所有元素进行转换。
  • 你能提供一个单独循环的例子吗?
猜你喜欢
  • 2012-08-20
  • 1970-01-01
  • 2022-01-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-06-10
  • 1970-01-01
  • 2022-06-21
相关资源
最近更新 更多