【问题标题】:Accessing and altering a global array using python joblib使用 python joblib 访问和更改全局数组
【发布时间】:2015-12-07 18:24:45
【问题描述】:

我正在尝试在 python 中使用 joblib 来加速某些数据处理,但在尝试确定如何将输出分配为所需格式时遇到了问题。我试图生成一个可能过于简单的代码来显示我遇到的问题:

from joblib import Parallel, delayed
import numpy as np

def main():
    print "Nested loop array assignment:"
    regular()
    print "Parallel nested loop assignment using a single process:"
    par2(1)
    print "Parallel nested loop assignment using multiple process:"
    par2(2)

def regular():
    # Define variables
    a = [0,1,2,3,4]
    b = [0,1,2,3,4]
    # Set array variable to global and define size and shape
    global ab
    ab = np.zeros((2,np.size(a),np.size(b)))

    # Iterate to populate array
    for i in range(0,np.size(a)):
        for j in range(0,np.size(b)):
            func(i,j,a,b)

    # Show array output
    print ab

def par2(process):
    # Define variables
    a2 = [0,1,2,3,4]
    b2 = [0,1,2,3,4]
    # Set array variable to global and define size and shape
    global ab2
    ab2 = np.zeros((2,np.size(a2),np.size(b2)))

    # Parallel process in order to populate array
    Parallel(n_jobs=process)(delayed(func2)(i,j,a2,b2) for i in xrange(0,np.size(a2)) for j in xrange(0,np.size(b2)))

    # Show array output
    print ab2

def func(i,j,a,b):
    # Populate array
    ab[0,i,j] = a[i]+b[j]
    ab[1,i,j] = a[i]*b[j]

def func2(i,j,a2,b2):
    # Populate array
    ab2[0,i,j] = a2[i]+b2[j]
    ab2[1,i,j] = a2[i]*b2[j]

# Run script
main()

输出如下:

Nested loop array assignment:
[[[  0.   1.   2.   3.   4.]
  [  1.   2.   3.   4.   5.]
  [  2.   3.   4.   5.   6.]
  [  3.   4.   5.   6.   7.]
  [  4.   5.   6.   7.   8.]]

 [[  0.   0.   0.   0.   0.]
  [  0.   1.   2.   3.   4.]
  [  0.   2.   4.   6.   8.]
  [  0.   3.   6.   9.  12.]
  [  0.   4.   8.  12.  16.]]]
Parallel nested loop assignment using a single process:
[[[  0.   1.   2.   3.   4.]
  [  1.   2.   3.   4.   5.]
  [  2.   3.   4.   5.   6.]
  [  3.   4.   5.   6.   7.]
  [  4.   5.   6.   7.   8.]]

 [[  0.   0.   0.   0.   0.]
  [  0.   1.   2.   3.   4.]
  [  0.   2.   4.   6.   8.]
  [  0.   3.   6.   9.  12.]
  [  0.   4.   8.  12.  16.]]]
Parallel nested loop assignment using multiple process:
[[[ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]]

 [[ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]]]

从 Google 和 StackOverflow 搜索功能看来,使用 joblib 时,全局数组不会在每个子进程之间共享。我不确定这是 joblib 的限制还是有办法解决这个问题?

实际上,我的脚本被其他代码包围,这些代码依赖于这个全局数组的最终输出,格式为 (4,x,x),其中x 是可变的(但通常在 100 到数千之间)。这是我目前查看并行处理的原因,因为对于 x = 2400,整个过程可能需要长达 2 小时。

没有必要使用 joblib(但我喜欢它的命名法和简洁性),因此请随意提出简单的替代方法,最好记住最终数组的要求。我正在使用 python 2.7.3 和 joblib 0.7.1。

【问题讨论】:

    标签: python python-2.7 parallel-processing joblib


    【解决方案1】:

    我能够使用 numpy 的 memmap 解决这个简单示例的问题。 在使用 memmap 并遵循 joblib documentation webpage 上的示例后,我仍然遇到问题,但我通过 pip 升级到了最新的 joblib 版本(0.9.3)并且一切运行顺利。这是工作代码:

    from joblib import Parallel, delayed
    import numpy as np
    import os
    import tempfile
    import shutil
    
    def main():
    
        print "Nested loop array assignment:"
        regular()
    
        print "Parallel nested loop assignment using numpy's memmap:"
        par3(4)
    
    def regular():
        # Define variables
        a = [0,1,2,3,4]
        b = [0,1,2,3,4]
    
        # Set array variable to global and define size and shape
        global ab
        ab = np.zeros((2,np.size(a),np.size(b)))
    
        # Iterate to populate array
        for i in range(0,np.size(a)):
            for j in range(0,np.size(b)):
                func(i,j,a,b)
    
        # Show array output
        print ab
    
    def par3(process):
    
        # Creat a temporary directory and define the array path
        path = tempfile.mkdtemp()
        ab3path = os.path.join(path,'ab3.mmap')
    
        # Define variables
        a3 = [0,1,2,3,4]
        b3 = [0,1,2,3,4]
    
        # Create the array using numpy's memmap
        ab3 = np.memmap(ab3path, dtype=float, shape=(2,np.size(a3),np.size(b3)), mode='w+')
    
        # Parallel process in order to populate array
        Parallel(n_jobs=process)(delayed(func3)(i,a3,b3,ab3) for i in xrange(0,np.size(a3)))
    
        # Show array output
        print ab3
    
        # Delete the temporary directory and contents
        try:
            shutil.rmtree(path)
        except:
            print "Couldn't delete folder: "+str(path)
    
    def func(i,j,a,b):
        # Populate array
        ab[0,i,j] = a[i]+b[j]
        ab[1,i,j] = a[i]*b[j]
    
    def func3(i,a3,b3,ab3):
        # Populate array
        for j in range(0,np.size(b3)):
            ab3[0,i,j] = a3[i]+b3[j]
            ab3[1,i,j] = a3[i]*b3[j]
    
    # Run script
    main()
    

    给出以下结果:

    Nested loop array assignment:
    [[[  0.   1.   2.   3.   4.]
      [  1.   2.   3.   4.   5.]
      [  2.   3.   4.   5.   6.]
      [  3.   4.   5.   6.   7.]
      [  4.   5.   6.   7.   8.]]
    
     [[  0.   0.   0.   0.   0.]
      [  0.   1.   2.   3.   4.]
      [  0.   2.   4.   6.   8.]
      [  0.   3.   6.   9.  12.]
      [  0.   4.   8.  12.  16.]]]
    Parallel nested loop assignment using numpy's memmap:
    [[[  0.   1.   2.   3.   4.]
      [  1.   2.   3.   4.   5.]
      [  2.   3.   4.   5.   6.]
      [  3.   4.   5.   6.   7.]
      [  4.   5.   6.   7.   8.]]
    
     [[  0.   0.   0.   0.   0.]
      [  0.   1.   2.   3.   4.]
      [  0.   2.   4.   6.   8.]
      [  0.   3.   6.   9.  12.]
      [  0.   4.   8.  12.  16.]]]
    

    我的一些想法要提醒任何未来的读者:

    • 在小型阵列上,准备并行环境所需的时间 (通常称为开销)意味着运行速度比 简单的 for 循环。
    • 比较更大的数组,例如。将 aa3 设置为 np.arange(0,10000)bb3np.arange(0,1000) “常规”方法为 12.4 秒,joblib 为 7.7 秒 方法。
    • 开销意味着让每个内核执行速度更快 内部 j 循环(参见 func3)。这是有道理的,因为我只是 启动 10,000 个进程而不是启动 10,000,000 个
      每个进程都需要设置。

    【讨论】:

      【解决方案2】:

      我正在使用的joblib 版本(0.13.2),实际上让我可以轻松访问大共享DataFrames

      当然,DataFrames 需要在并行循环开始之前进行预分配,并且每个线程必须只访问它的 DataFrame 部分才能写入,但它可以工作。

      data  = pd.DataFrame(...)
      stats = pd.DataFrame(np.nan, index=np.arange(0, size/step), columns=cols, dtype=np.float64)
      
      Parallel(n_jobs=8, prefer='threads')(
                  delayed(_threadsafe_func)(data, stats, i, step, other_params)
                  for i in range(0, size, step))
      

      _threadsafe_func 内部,然后可以通过这种方式读取或写入stats DataFrame

      index = i/step
      print('[' + str(i) + '] Running job with index:', str(int(index)), '/', len(data)/step)
      chunk = data[i:i + step]
      stats.loc[index, 'mean'] = chunk.mean()    # 'mean' is an existing column already filled with np.nan
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-10-25
        • 2021-05-22
        相关资源
        最近更新 更多