【问题标题】:How to vectorize this python code?如何矢量化这个 python 代码?
【发布时间】:2013-01-05 00:39:22
【问题描述】:

我正在尝试使用 NumPy 和矢量化操作来使一段代码运行得更快。但是,我似乎对如何向量化这段代码有误解(可能是由于对向量化的理解不完整)。

这是带有循环的工作代码(A 和 B 是设定大小的二维数组,已经初始化):

for k in range(num_v):
    B[:] = A[:]
    for i in range(num_v):
        for j in range(num_v):
            A[i][j] = min(B[i][j], B[i][k] + B[k][j])
return A

这是我对上述代码进行矢量化的尝试:

for k in range(num_v):
    B = numpy.copy(A)
    A = numpy.minimum(B, B[:,k] + B[k,:])
return A

为了测试这些,我使用了以下代码,上面的代码包含在一个名为“算法”的函数中:

def setup_array(edges, num_v):
    r = range(1, num_v + 1)
    A = [[None for x in r] for y in r]  # or (numpy.ones((num_v, num_v)) * 1e10) for numpy
    for i in r:
        for j in r:
            val = 1e10
            if i == j:
                val = 0 
            elif (i,j) in edges:
                val = edges[(i,j)]
            A[i-1][j-1] = val 
    return A

A = setup_array({(1, 2): 2, (6, 4): 1, (3, 2): -3, (1, 3): 5, (3, 6): 5, (4, 5): 2, (3, 1): 4, (4, 3): 8, (3, 4): 6, (2, 4): -4, (6, 5): -5}, 6) 
B = []
algorithm(A, B, 6)

预期的结果,以及我使用第一个代码得到的结果是:

[[0, 2, 5, -2, 0, 10] 
 [8, 0, 4, -4, -2, 9]
 [4, -3, 0, -7, -5, 5]
 [12, 5, 8, 0, 2, 13]
 [10000000000.0, 9999999997.0, 10000000000.0, 9999999993.0, 0, 10000000000.0]
 [13, 6, 9, 1, -5, 0]]

第二个(矢量化)函数返回:

[[ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0. -4.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0.  0.  0.]
 [ 0. -4.  0.  0. -5.  0.]]

我错过了什么?

【问题讨论】:

    标签: python numpy vectorization


    【解决方案1】:

    通常您希望对代码进行矢量化处理,因为您认为代码运行速度太慢。
    如果您的代码太慢,那么我可以告诉您正确的索引会使其更快。
    而不是A[i][j],你应该写A[i, j]——这样可以避免(子)数组的临时副本。
    由于您在代码的最内层循环中执行此操作,这可能会非常昂贵。

    看这里:

    In [37]: timeit test[2][2]
    1000000 loops, best of 3: 1.5 us per loop
    
    In [38]: timeit test[2,2]
    1000000 loops, best of 3: 639 ns per loop
    

    在您的代码中始终如一地执行此操作 - 我坚信这已经解决了您的性能问题!

    话说回来……

    ...这是我对如何矢量化的看法

    for k in range(num_v):
        numpy.minimum(A, np.add.outer(A[:,k], A[k,:]), A)
    return A
    

    numpy.minimum 将比较两个数组并按元素返回两个元素中较小的一个。如果您传递第三个参数,它将获取输出。如果这是一个输入数组,那么整个操作就到位了。

    正如 Peter de Rivay 解释的那样,您的广播解决方案存在问题 - 但从数学上讲,您想要做的是某种外积,而不是两个向量的相加。因此可以在add函数上使用外层操作。

    NumPy 的二进制 ufunc 具有 special methods 用于执行某些类型的 特殊的向量化操作,如reduce、accumulate、sum和outer。

    【讨论】:

      【解决方案2】:

      问题是行中的数组广播引起的:

      A = numpy.minimum(B, B[:,k] + B[k,:])
      

      B 的大小为 6 x 6,B[:,k] 是一个包含 6 个元素的数组,B[k,:] 是一个包含 6 个元素的数组。

      (因为您使用的是 numpy 数组类型,所以 B[:,k] 和 B[k,:] 都返回形状为 N 的 rank-1 数组)

      Numpy 会自动更改大小以匹配:

      1. 首先将 B[:,k] 添加到 B[k,:] 以生成具有 6 个元素的中间数组结果。 (这不是你想要的)
      2. 第二个这个 6 元素数组通过重复行广播到 6 x 6 矩阵
      3. 计算原始矩阵的第三个最小值并计算此广播矩阵。

      这意味着你的 numpy 代码相当于:

      for k in range(num_v):
         B[:] = A[:]
         C=[B[i][k]+B[k][i] for i in range(num_v)]
         for i in range(num_v):
            for j in range(num_v):
               A[i][j] = min(B[i][j], C[j])
      

      修复代码最简单的方法是使用矩阵类型而不是数组类型:

      A = numpy.matrix(A)
      for k in range(num_v):
          A = numpy.minimum(A, A[:,k] + A[k,:])
      

      矩阵类型使用更严格的广播规则,所以在这种情况下:

      1. A[:,k] 通过重复列扩展为 6 x 6 矩阵
      2. A[k,:] 通过重复行扩展为 6 x 6 矩阵
      3. 广播的矩阵相加形成一个 6 x 6 矩阵
      4. 应用了最小值

      【讨论】:

      • 很好的解释,谢谢!我不敢相信我的代码现在有多快!仅这一更改就使整个代码运行(在不同数据上)从 45 分钟缩短到不到 1 秒。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-07-08
      • 1970-01-01
      • 2021-01-28
      • 2015-12-08
      • 1970-01-01
      • 2018-03-15
      相关资源
      最近更新 更多