【问题标题】:Minimizing overhead due to the large number of Numpy dot calls由于大量的 Numpy 点调用而最小化开销
【发布时间】:2020-04-08 09:29:27
【问题描述】:

我的问题如下,我有一个迭代算法,这样在每次迭代时它需要执行几个矩阵-矩阵乘法点(A_iB_i),对于我 = 1 ... k。由于这些乘法是使用 Numpy 的点执行的,我知道他们正在调用 BLAS-3 实现,这非常快。问题是调用的数量很大,结果是我的程序中的一个瓶颈。我想通过制作更少的产品但使用更大的矩阵来最小化所有这些调用的开销。

为简单起见,考虑所有矩阵都是 n x n(通常 n 不大,范围在 1 到 1000 之间)。解决我的问题的一种方法是考虑块对角矩阵 diag(A_i) 并执行下面的乘积。

这只是对函数 dot 的一次调用,但现在程序浪费了很多时间来执行与零的乘法。这个想法似乎不起作用,但它给出了结果 [A_1 B_1, ..., A_k B_k],即所有产品堆叠在一个大矩阵中.

我的问题是,有没有办法通过单个函数调用计算 [A_1 B_1, ..., A_k B_k]?或者更重要的是,我如何计算这些乘积比创建一个 Numpy 点循环更快?

【问题讨论】:

  • 其他人注意到ndot 的调用可能比对数组n 大的一次调用要快。内存管理开销消耗了减少迭代所节省的时间。换一种说法,复杂任务的“几次”迭代实际上可能是最佳的。在这种情况下,除非有可以处理块点的 BLAS 级代码,否则您的原始迭代可能是最快的。
  • @hpaulj 感谢您的评论。你介意提供一些其他人注意到的参考吗?我很感兴趣。
  • n x n 范围从 1 到 1_000 是一个相当大的区域。对于非常小的 n (

标签: performance numpy linear-algebra matrix-multiplication


【解决方案1】:

这取决于矩阵的大小

编辑

对于较大的 nxn 矩阵(大小约为 20),从编译代码调用 BLAS 更快,对于较小的矩阵,自定义 Numba 或 Cython 内核通常更快。

以下方法为给定的输入形状生成自定义点函数。使用这种方法,还可以从与编译器相关的优化(例如循环展开)中受益,这对于小型矩阵尤其重要。

需要注意的是,生成和编译一个内核大约需要 . 1s,因此请确保仅在确实需要时才调用生成器。

生成器函数

def gen_dot_nm(x,y,z):
    #small kernels
    @nb.njit(fastmath=True,parallel=True)
    def dot_numba(A,B):
        """
        calculate dot product for (x,y)x(y,z)
        """
        assert A.shape[0]==B.shape[0]
        assert A.shape[2]==B.shape[1]

        assert A.shape[1]==x
        assert B.shape[1]==y
        assert B.shape[2]==z

        res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
        for ii in nb.prange(A.shape[0]):
            for i in range(x):
                for j in range(z):
                    acc=0.
                    for k in range(y):
                        acc+=A[ii,i,k]*B[ii,k,j]
                    res[ii,i,j]=acc
        return res

    #large kernels
    @nb.njit(fastmath=True,parallel=True)
    def dot_BLAS(A,B):
        assert A.shape[0]==B.shape[0]
        assert A.shape[2]==B.shape[1]

        res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
        for ii in nb.prange(A.shape[0]):
            res[ii]=np.dot(A[ii],B[ii])
        return res

    #At square matices above size 20
    #calling BLAS is faster
    if x>=20 or y>=20 or z>=20:
        return dot_BLAS
    else:
        return dot_numba

使用示例

A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)

dot22=gen_dot_nm(2,2,2)
X=dot22(A,B)
%timeit X3=dot22(A,B)
#5.94 µs ± 21.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) 

旧答案

另一种选择,但需要做更多的工作,是使用一些特殊的 BLAS 实现,它会及时为非常小的矩阵创建 custom kernels,而不是从 C 调用这个内核。

示例

import numpy as np
import numba as nb

#Don't use this for larger submatrices
@nb.njit(fastmath=True,parallel=True)
def dot(A,B):
    assert A.shape[0]==B.shape[0]
    assert A.shape[2]==B.shape[1]

    res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
    for ii in nb.prange(A.shape[0]):
        for i in range(A.shape[1]):
            for j in range(B.shape[2]):
                acc=0.
                for k in range(B.shape[1]):
                    acc+=A[ii,i,k]*B[ii,k,j]
                res[ii,i,j]=acc
    return res

@nb.njit(fastmath=True,parallel=True)
def dot_22(A,B):
    assert A.shape[0]==B.shape[0]
    assert A.shape[1]==2
    assert A.shape[2]==2
    assert B.shape[1]==2
    assert B.shape[2]==2

    res=np.empty((A.shape[0],A.shape[1],B.shape[2]),dtype=A.dtype)
    for ii in nb.prange(A.shape[0]):
        res[ii,0,0]=A[ii,0,0]*B[ii,0,0]+A[ii,0,1]*B[ii,1,0]
        res[ii,0,1]=A[ii,0,0]*B[ii,0,1]+A[ii,0,1]*B[ii,1,1]
        res[ii,1,0]=A[ii,1,0]*B[ii,0,0]+A[ii,1,1]*B[ii,1,0]
        res[ii,1,1]=A[ii,1,0]*B[ii,0,1]+A[ii,1,1]*B[ii,1,1]
    return res

时间安排

A=np.random.rand(1000,2,2)
B=np.random.rand(1000,2,2)

X=A@B
X2=np.einsum("xik,xkj->xij",A,B)
X3=dot_22(A,B) #avoid measurig compilation overhead
X4=dot(A,B)    #avoid measurig compilation overhead

%timeit X=A@B
#262 µs ± 2.55 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.einsum("xik,xkj->xij",A,B,optimize=True)
#264 µs ± 3.22 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit X3=dot_22(A,B)
#5.68 µs ± 27.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit X4=dot(A,B)
#9.79 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

【讨论】:

  • 我对 Numba 有一些经验,所以这个答案非常受欢迎。给我一些时间试试这个。一个小问题:断言行是否提供了一些加速?为什么?谢谢!
  • @Integral 它可以提高速度,因为它还通知编译器确切的内存布局。但主要原因是避免崩溃。没有边界检查,所以如果你用完边界,Python 将简单地崩溃或函数产生废话(与关闭边界检查的 C 例程或 Cython 的行为相同)
  • 我知道您的 dot_22 更快,因为您通过显式编写每个结果来消除循环。您认为这种方法仍然适用于通用 dot_nn 吗?我的意思是在更快的意义上工作。
  • @Integral 是的,这是可能的。您有多种不同的阵列形状还是只有几种?
【解决方案2】:

您可以将数组堆叠成 (k, n, n) 形状,然后调用 numpy.matmul 或使用 @ 运算符。

例如,

In [18]: A0 = np.array([[1, 2], [3, 4]])                                                                 

In [19]: A1 = np.array([[1, 2], [-3, 5]])                                                                

In [20]: A2 = np.array([[4, 0], [1, 1]])                                                                 

In [21]: B0 = np.array([[1, 4], [-3, 4]])                                                                

In [22]: B1 = np.array([[2, 1], [1, 1]])                                                                 

In [23]: B2 = np.array([[-2, 9], [0, 1]])                                                                

In [24]: np.matmul([A0, A1, A2], [B0, B1, B2])                                                           
Out[24]: 
array([[[-5, 12],
        [-9, 28]],

       [[ 4,  3],
        [-1,  2]],

       [[-8, 36],
        [-2, 10]]])

或者,使用@

In [32]: A = np.array([A0, A1, A2])                                                                      

In [33]: A                                                                                               
Out[33]: 
array([[[ 1,  2],
        [ 3,  4]],

       [[ 1,  2],
        [-3,  5]],

       [[ 4,  0],
        [ 1,  1]]])

In [34]: B = np.array([B0, B1, B2])                                                                      

In [35]: A @ B                                                                                           
Out[35]: 
array([[[-5, 12],
        [-9, 28]],

       [[ 4,  3],
        [-1,  2]],

       [[-8, 36],
        [-2, 10]]])

【讨论】:

  • 谢谢,这似乎可以在一个电话中处理所有产品。这种方法更快吗?
【解决方案3】:

如果您不想浪费时间乘以零,那么您真正想要的是稀疏矩阵。使用来自@WarrenWeckesser 的AB 矩阵:

from scipy import sparse
sparse.block_diag((A0, A1, A2), format = "csr") @ np.concatenate((B0, B1, B2), axis = 0)
Out[]: 
array([[-5, 12],
       [-9, 28],
       [ 4,  3],
       [-1,  2],
       [-8, 36],
       [-2, 10]], dtype=int32)

这可能是大型矩阵的加速。对于较小的人,@max9111 使用numba 可能有正确的想法。

【讨论】:

    猜你喜欢
    • 2016-09-01
    • 2014-09-12
    • 2012-03-15
    • 2013-06-28
    • 2017-11-08
    • 2015-05-22
    • 1970-01-01
    • 1970-01-01
    • 2010-11-21
    相关资源
    最近更新 更多