【问题标题】:How can I vectorize and speed up this large array calculation?如何矢量化和加速这个大型数组计算?
【发布时间】:2016-09-14 13:46:18
【问题描述】:

我目前正在尝试计算 10.000 x 10.000 值数组中所有子平方和的总和。例如,如果我的数组是:

1 1 1 
2 2 2
3 3 3 

我希望结果是:

1+1+1+2+2+2+3+3+3                        [sum of squares of size 1]
+(1+1+2+2)+(1+1+2+2)+(2+2+3+3)+(2+2+3+3) [sum of squares of size 2]
+(1+1+1+2+2+2+3+3+3)                     [sum of squares of size 3]
________________________________________
68

所以,作为第一次尝试,我编写了一个非常简单的 python 代码来做到这一点。因为它在 O(k^2.n^2) 中(n 是大数组的大小,k 是我们得到的子正方形的大小),处理过程非常长。我在 O(n^2) 中编写了另一个算法来加速它:

def getSum(tab,size):
    n = len(tab)
    tmp = numpy.zeros((n,n))

    for i in xrange(0,n):
        sum = 0
        for j in xrange(0,size):
            sum += tab[j][i]
        tmp[0][i] = sum

        for j in xrange(1,n-size+1):
            sum += (tab[j+size-1][i] - tab[j-1][i])
            tmp[j][i] = sum

    finalsum = 0
    for i in xrange(0,n-size+1):
        sum = 0 
        for j in xrange(0,size):
            sum += tmp[i][j]
        finalsum += sum

        for j in xrange(1,n-size+1):
            finalsum += (tmp[i][j+size-1] - tmp[i][j-1])

return finalsum

所以这段代码可以正常工作。给定一个数组和子正方形的大小,它将返回所有这些子正方形中的值的总和。我基本上会遍历子方块的大小以获取所有可能的值。

问题在于,对于大型阵列(10.000 x 10.000 阵列需要超过 20 天),这又太长了。我用谷歌搜索并了解到我可以使用 numpy 对数组上的迭代进行矢量化。但是,在我的情况下,我无法弄清楚如何做到这一点......

如果有人可以帮助我加快算法速度,或者为我提供有关该主题的良好文档,我会很高兴!

谢谢!

【问题讨论】:

  • 我认为计算矩阵中每个数字的计数次数会得到更好的方法...
  • 请看我的编辑:我得到一个 O(n^2) 算法...

标签: python arrays algorithm numpy vectorization


【解决方案1】:

基于计算每个数字计数多少次的想法,我得出了这个简单的代码:

def get_sum(matrix, n):
    ret = 0
    for i in range(n):
        for j in range(n):
            for k in range(1, n + 1):
                # k is the square size. count is times of the number counted.
                count = min(k, n - k + 1, i + 1, n - i) * min(k, n - k + 1, j + 1, n - j)
                ret += count * matrix[i][j]
    return ret

a = [[1, 1, 1], [2, 2, 2], [3, 3, 3]]

print get_sum(a, 3) # 68

Divakar 的解决方案很棒,但是,我认为我的解决方案可能更有效,至少在渐近时间复杂度方面(O(n^3) 与 Divakar 的 O(n^3logn) 相比)。


我现在得到一个 O(n^2) 的解决方案...

基本上,我们可以得到:

def get_sum2(matrix, n):
    ret = 0
    for i in range(n):
        for j in range(n):
            x = min(i + 1, n - i)
            y = min(j + 1, n - j)
            # k < half
            half = (n + 1) / 2
            for k in range(1, half + 1):
                count = min(k, x) * min(k, y)
                ret += count * matrix[i][j]
            # k >= half
            for k in range(half + 1, n + 1):
                count = min(n + 1 - k, x) * min(n + 1 - k, y)
                ret += count * matrix[i][j]
    return ret

你可以看到sum(min(k, x) * min(k, y))可以在O(1)中计算,当1

所以我们来到了 O(n^2) 代码:

def get_square_sum(n):
    return n * (n + 1) * (2 * n + 1) / 6


def get_linear_sum(a, b):
    return (b - a + 1) * (a + b) / 2


def get_count(x, y, k_end):
    # k <= min(x, y), count is k*k
    sum1 = get_square_sum(min(x, y))

    # k > min(x, y) and k <= max(x, y), count is k * min(x, y)
    sum2 = get_linear_sum(min(x, y) + 1, max(x, y)) * min(x, y)

    # k > max(x, y), count is x * y
    sum3 = x * y * (k_end - max(x, y))

    return sum1 + sum2 + sum3


def get_sum3(matrix, n):
    ret = 0
    for i in range(n):
        for j in range(n):
            x = min(i + 1, n - i)
            y = min(j + 1, n - j)
            half = n / 2

            # k < half
            ret += get_count(x, y, half) * matrix[i][j]
            # k >= half
            ret += get_count(x, y, half + half % 2) * matrix[i][j]

    return ret 

测试:

a = [[1, 1, 1], [2, 2, 2], [3, 3, 3]]
n = 1000
b = [[1] * n] * n
print get_sum3(a, 3) # 68
print get_sum3(b, n) # 33500333666800

您可以将我的 O(n^2) Python 代码重写为 C,我相信它会产生一个非常有效的解决方案...

【讨论】:

  • 尽管 Divakar 的算法具有较大的计算成本,但 scipy 的卷积是在 C 中执行的,而您的循环是用 python 编写的(对于大型矩阵来说要慢几个数量级)。不过,对于 C 解决方案来说,这将是一个不错的方法。
  • @ImanolLuengo 感谢您指出这一点,我更新了我的答案。
  • 相当聪明的idd!为了使它更好,您可以将 halfhalf + half % 2 作为循环外的常量移动!
【解决方案2】:

遵循@Divakar 的出色想法,我建议使用integral images 来加速卷积。如果矩阵非常大,则必须对其进行多次卷积(每个内核大小一次)。使用积分图像(又名求和面积表)可以非常有效地计算多个卷积(或平方内总和的评估)。

一旦计算了积分图像M,区域内所有值的总和(x0, y0) - (x1, y1) 可以通过 4 次算术计算来计算,无论窗口大小如何(图片来自维基百科):

M[x1, y1] - M[x1, y0] - M[x0, y1] + M[x0, y0]

这可以很容易地在 numpy.可以使用cumsum 计算积分图像。按照例子:

tab = np.array([[1, 1, 1], [2, 2, 2], [3, 3, 3]])
M = tab.cumsum(0).cumsum(1) # Create integral images
M = np.pad(M, ((1,0), (1,0)), mode='constant') # pad it with a row and column of zeros

M 用一行和一列零填充以处理第一行(其中x0 = 0y0 = 0)。

然后,给定一个窗口大小W,每个大小为W 的窗口的总和可以用numpy 高效计算并完全矢量化为:

all_sums = M[W:, W:] - M[:-W, W:] - M[W:, :-W] + M[:-W, :-W]

请注意,上面的矢量化操作计算每个窗口的总和,即矩阵的每个 A、B、C 和 D。然后将所有窗口的总和计算为

total = all_sums.sum()

请注意,对于N 不同的大小,与卷积不同,积分图像只需计算一次,因此,代码可以非常高效地编写为:

def get_all_sums(A):
    M = A.cumsum(0).cumsum(1)
    M = np.pad(M, ((1,0), (1,0)), mode='constant')

    total = 0
    for W in range(1, A.shape[0] + 1):
        tmp = M[W:, W:] + M[:-W, :-W] - M[:-W, W:] - M[W:, :-W]
        total += tmp.sum()

    return total

示例的输出:

>>> get_all_sums(tab)
68

将卷积与具有不同大小矩阵的积分图像进行比较的一些时序。 getAllSums 指的是 Divakar 的卷积方法,而get_all_sums 指的是上述基于积分图像的方法:

>>> R1 = np.random.randn(10, 10)
>>> R2 = np.random.randn(100, 100)

1) 使用R1 10x10 矩阵:

>>> %time getAllSums(R1)
CPU times: user 353 µs, sys: 9 µs, total: 362 µs
Wall time: 335 µs
2393.5912717342017

>>> %time get_all_sums(R1)
CPU times: user 243 µs, sys: 0 ns, total: 243 µs
Wall time: 248 µs
2393.5912717342012

2) 使用R2 100x100 矩阵:

>>> %time getAllSums(R2)
CPU times: user 698 ms, sys: 0 ns, total: 698 ms
Wall time: 701 ms
176299803.29826894

>>> %time get_all_sums(R2)
CPU times: user 2.51 ms, sys: 0 ns, total: 2.51 ms
Wall time: 2.47 ms
176299803.29826882

请注意,对于足够大的矩阵,使用积分图像比卷积快 300 倍。

【讨论】:

  • 确实很聪明!
  • @Divakar 积分图像在实践中非常有用,问题是它们只能计算有效的均匀滤波器。其他过滤器可以通过一些技巧近似,但它们变得昂贵。如果没有您的回答,我永远不会想到它们,我的大脑中只有一个过滤器,可以将 uniform filter 转换为 integral images 哈哈,在第一名!
  • 是的,我从来没有处理过这些。所以,这就是为什么对我来说看起来很神奇/聪明的原因。我猜编程遇到瓶颈的地方,数学有帮助:)
【解决方案3】:

这些滑动求和最适合计算为 2D 卷积求和,并且可以使用 scipy's convolve2d 有效计算。因此,对于特定大小,您可以获得总和,就像这样 -

def getSum(tab,size):
    # Define kernel and perform convolution to get such sliding windowed summations
    kernel = np.ones((size,size),dtype=tab.dtype)
    return convolve2d(tab, kernel, mode='valid').sum()

要获得所有大小的总和,我认为就内存和性能效率而言,最好的方法是使用循环来循环所有可能的大小。因此,要获得最终总和,您将拥有 -

def getAllSums(tab):
    finalSum = 0
    for i in range(tab.shape[0]):
        finalSum += getSum(tab,i+1)
    return finalSum

示例运行 -

In [51]: tab
Out[51]: 
array([[1, 1, 1],
       [2, 2, 2],
       [3, 3, 3]])

In [52]: getSum(tab,1) # sum of squares of size 1
Out[52]: 18

In [53]: getSum(tab,2) # sum of squares of size 2
Out[53]: 32

In [54]: getSum(tab,3) # sum of squares of size 3
Out[54]: 18

In [55]: getAllSums(tab) # sum of squares of all sizes
Out[55]: 68

【讨论】:

  • 这很酷。您能否用 Big-Oh 表示法给出复杂性?
  • @Sayakiss scipy 的 2D 卷积 AFAIK 应该在 C 中实现,所以我们可以说在 python 级别说话时它是矢量化的。因此,就 O 表示法而言,获得所有大小的最终总和的整个解决方案应该是 O(n),其中 n 是 tab 的大小,即 tab 中的行数。
  • Amazing.. 你的意思是 getSum 的复杂度是 O(n) 吗?但是tab 中有 n^2 个元素,只需简单扫描tab 就会花费 O(n^2)...我只是无法理解convolve2d 背后的魔力...
  • 这真的很酷。我不是这样想的。我会试试你的解决方案,谢谢!
  • @Sayakiss 认为 scipy 的 convole2D 执行相同的操作,但在 C 中。因此,这将是C 中的 O(n^2),它不能完全表示为 O 表示法,但可以将其视为 python 级别的矢量化操作。但是,让我们假设它是 O(K),所以这就是 getSum。接下来,我们在 getAllSums 处有一个复杂度为 O(n) 的纯 Python 循环。因此,总复杂度变为 O(K*n),其中 n 是tab 中的行数。希望这是有道理的! :)
猜你喜欢
  • 1970-01-01
  • 2021-06-18
  • 2021-01-28
  • 2014-11-06
  • 1970-01-01
  • 1970-01-01
  • 2021-04-04
  • 2014-07-17
  • 1970-01-01
相关资源
最近更新 更多