【问题标题】:Parallelizing with joblib - Performance saturation and general considerations与 joblib 并行 - 性能饱和和一般注意事项
【发布时间】:2017-01-05 08:37:51
【问题描述】:

我使用 joblib 是为了在为离散数据构建概率密度的简单任务上获得一些效率。简而言之,我对我的性能改进在 2 个并行进程中饱和这一事实感到困惑,而拥有更多进程却没有任何收获。我也对优化该程序的其他可能方法感到好奇。我将首先详细介绍问题的具体细节。

我考虑一个形状为(n_samples, n_features) 的二进制数组X 和一个分类标签的向量y。出于实验的目的,这样做可以:

import numpy as np
X = np.random.randint(0,2,size=[n_samples,n_features])
y = np.random.randint(0,10,size=[n_samples,])

函数joint_probability_binary 将特征数组X(单个特征)的列和标签向量y 作为输入,并输出它们的联合分布。没有什么花哨。

def joint_probability_binary(x, y):

    labels    = list(set(y))
    joint = np.zeros([len(labels), 2])

    for i in xrange(y.shape[0]):
        joint[y[i], x[i]] += 1

    return joint / float(y.shape[0])

现在,我想将joint_probability_binary 应用于X 的每个功能(每一列)。我的理解是,这个任务(给定足够大的n_samples 值)对于多处理并行性来说是粗粒度的。我编写了一个顺序和并行函数来执行此任务。

from joblib import Parallel, delayed

def joints_sequential(X, y):
    return [joint_probability_binary(X[:,i],y) for i in range(X.shape[1])]

def joints_parallel(X, y, n_jobs):
    return Parallel(n_jobs=n_jobs, verbose=0)(
        delayed(joint_probability_binary)(X = X[:,i],y = y) 
        for i in range(X.shape[1]))

我改编了 Guido van Rossum 自己写的计时函数,呈现在here,如下:

import time

def timing(f, n, **kwargs):
    r = range(n)
    t1 = time.clock()
    for i in r:
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
        f(**kwargs);
    t2 = time.clock()
    return round(t2 - t1, 3)

最后,为了研究性能的变化及其对作业数量的依赖性,我运行了

tseq = timing(joints_sequential,10, X=X,y=y)
print('Sequential list comprehension - Finished in %s sec' %tseq)

for nj in range(1,9):
    tpar = timing(joints_parallel,10, X=X, y=y, n_jobs=nj)
    print('Parallel execution - %s jobs - Finished in %s sec' %(nj,tpar))

对于n_samples = 20000n_features = 20,我明白了

Sequential list comprehension - Finished in 60.778 sec
Parallel execution - 1 jobs - Finished in 61.975 sec
Parallel execution - 2 jobs - Finished in 6.446 sec
Parallel execution - 3 jobs - Finished in 7.516 sec
Parallel execution - 4 jobs - Finished in 8.275 sec
Parallel execution - 5 jobs - Finished in 8.953 sec
Parallel execution - 6 jobs - Finished in 9.962 sec
Parallel execution - 7 jobs - Finished in 10.382 sec
Parallel execution - 8 jobs - Finished in 11.321 sec

1.

此结果证实,并行化此任务可以获得相当多的收益(在 OS X 上运行,配备 2 GHz Intel Core i7 和 4 核)。 然而,我发现最引人注目的是n_jobs = 2 的性能已经饱和。考虑到每个任务的大小,我很难认为这可能是由 Joblib 开销单独引起的,但是我的直觉又是有限的。我用更大的数组n_samples = 200000n_features = 40 重复了这个实验,这导致了相同的行为: 顺序列表理解 - 在 1230.172 秒内完成

Parallel execution - 1 jobs - Finished in 1198.981 sec
Parallel execution - 2 jobs - Finished in 94.624 sec
Parallel execution - 3 jobs - Finished in 95.1 sec
...

是否有人对为什么会出现这种情况有直觉(鉴于我的整体方法足够合理)?

2.

最后,在整体优化方面,还有哪些其他方法可以提高此类程序的性能?我怀疑编写计算联合概率的函数的 Cython 实现会收获很多,但我没有这方面的经验。

【问题讨论】:

    标签: python optimization parallel-processing multiprocessing joblib


    【解决方案1】:

    我的经验是,这通常是因为您超额订阅核心。在带有 i7-3770 的桌面上,我得到以下信息:

    Sequential list comprehension - Finished in 25.734 sec
    Parallel execution - 1 jobs - Finished in 25.532 sec
    Parallel execution - 2 jobs - Finished in 4.302 sec
    Parallel execution - 3 jobs - Finished in 4.178 sec
    Parallel execution - 4 jobs - Finished in 4.521 sec
    

    在不了解您的系统的情况下,我无法提供太多帮助。但是,由于超线程或其他技术,笔记本电脑处理器的逻辑内核通常比物理内核多。不过,这不是一项在超线程方面做得很好的任务。 (例如,您不会看到通过使用额外线程来提高性能,因为这里的 IO 没有阻塞任何东西,所以没有太多机会)。

    您还可以拥有一个 CPU,当一个或两个核心被大量使用时,它会自动提高其时钟频率,但在所有核心都被大量使用时会下降。这可以为您提供两个内核的额外性能。

    为了获得更高的性能,我建议将您的joint_probability_binary() 函数编写为一个numpy ufunc,使用它们的from pyfunc() 函数来生成一个c 版本。 https://docs.scipy.org/doc/numpy/reference/ufuncs.html

    Numba 也可以提供帮助,但我从未使用过它 http://numba.pydata.org/numba-doc/0.35.0/index.html

    【讨论】:

      【解决方案2】:

      只要这个 SO 页面是我的 google 请求“joblib 性能”的第一个页面,我就进行了一些调查。

      是否有人对为什么会出现这种情况有直觉(鉴于我的整体方法足够合理)?

      在我看来,问题受内存限制。这种情况因测量不明确而感到困惑。我运行原始代码并通过time python3 joblib_test.py 在外部测量运行时间,在joblib_test.py 中我一直评论除了一个评估之外的所有内容。在我的 4 核 CPU 上,我使用了 n_samples = 2000000,n_features = 40,并减少了重复次数:

      1. 顺序列表理解 - 在 54.911 秒内完成
        实际0m55.307s

      2. 并行执行 - 4 个作业 - 在 2.515 秒内完成
        实际0m53.519s

      它清楚地表明,实际运行时间几乎相同。

      最后,在整体优化方面,还有哪些其他方法可以提高此类程序的性能?

      使用 numba(所以它是import numba,用@numba.jit(nopython=True,cache=True) 装饰真正的worker,对worker 稍作修改会导致加速7 倍

      1. 顺序列表理解 (mod) - 在 7.665 秒内完成
        实际0m7.167s

      2. 并行执行 (mod) - 4 个作业 - 在 2.004 秒内完成
        实际0m9.143s

      再一次,它很好地展示了受内存带宽限制的事实。对于优化版本,使用 4 个内核会产生一些开销。

      完整代码示例:

      n_samples = 2000000
      n_features = 40
      
      print("n_samples = ", n_samples, "  n_features = ", n_features)
      
      import numpy as np
      # X = np.random.randint(0,2,size=[n_samples,n_features])
      # y = np.random.randint(0,10,size=[n_samples,])
      
      def joint_probability_binary(x, y):
      
          labels    = list(set(y))
          joint = np.zeros([len(labels), 2])
      
          for i in range(y.shape[0]):
              joint[y[i], x[i]] += 1
      
          return joint / float(y.shape[0])
      
      import numba
      @numba.jit(nopython=True,cache=True)
      def joint_probability_binary_mod(x, y):
          labels    = np.unique(y)
          joint = np.zeros((labels.size, 2))
      
          for i in range(y.shape[0]):
              joint[y[i], x[i]] += 1
      
          return joint / float(y.shape[0])
      
      from joblib import Parallel, delayed
      
      def joints_sequential(the_job):
          X = np.random.randint(0,2,size=[n_samples,n_features])
          y = np.random.randint(0,10,size=[n_samples,])
          return [the_job(X[:,i],y) for i in range(X.shape[1])]
      
      
      def joints_parallel(n_jobs, the_job,batch_size='auto'):
          X = np.random.randint(0,2,size=[n_samples,n_features])
          y = np.random.randint(0,10,size=[n_samples,])
          return Parallel(n_jobs=n_jobs, verbose=0,batch_size=batch_size)(
              delayed(the_job)(x = X[:,i],y = y) 
              for i in range(X.shape[1])
          )
      
      import time
      
      def timing(f, n, **kwargs):
          r = range(n)
          t1 = time.clock()
          for i in r:
              res = f(**kwargs);
          t2 = time.clock()
          #print(np.sum(res))
          return round(t2 - t1, 3)
      
      ttime = 0
      
      # tseq = timing(joints_sequential,1, the_job=joint_probability_binary_mod)
      # print('Sequential list comprehension (mod) - Finished in %s sec' %tseq)
      # ttime+=tseq
      
      for nj in range(4,5):
          tpar = timing(joints_parallel,1,n_jobs=nj,
                        the_job=joint_probability_binary_mod,
                        batch_size = int(n_samples/nj))
          print('Parallel execution (mod) - %s jobs - Finished in %s sec' %(nj,tpar))
          ttime+=tpar
      
      # tseq = timing(joints_sequential,1, the_job=joint_probability_binary)
      # print('Sequential list comprehension - Finished in %s sec' %tseq)
      # ttime+=tseq
      
      # for nj in range(4,5):
      #     tpar = timing(joints_parallel,1,n_jobs=nj, the_job=joint_probability_binary)
      #     print('Parallel execution - %s jobs - Finished in %s sec' %(nj,tpar))
      #     ttime+=tpar
      
      print("total time measured by Python",ttime)
      
      

      【讨论】:

        猜你喜欢
        • 2012-02-03
        • 1970-01-01
        • 1970-01-01
        • 2012-08-22
        • 1970-01-01
        • 2010-11-01
        • 1970-01-01
        • 2011-04-21
        相关资源
        最近更新 更多