【问题标题】:Roll rows of a matrix independently独立滚动矩阵的行
【发布时间】:2013-12-20 01:44:10
【问题描述】:

我有一个矩阵(准确地说是 2d numpy ndarray):

A = np.array([[4, 0, 0],
              [1, 2, 3],
              [0, 0, 5]])

我想根据另一个数组中的滚动值独立滚动A的每一行:

r = np.array([2, 0, -1])

也就是说,我想这样做:

print np.array([np.roll(row, x) for row,x in zip(A, r)])

[[0 0 4]
 [1 2 3]
 [0 5 0]]

有没有办法有效地做到这一点?也许使用花哨的索引技巧?

【问题讨论】:

  • np.roll 不接受 numpy 数组作为输入有点有趣。

标签: python performance numpy


【解决方案1】:

当然你可以使用高级索引来做到这一点,它是否是最快的方法可能取决于你的数组大小(如果你的行很大,它可能不是):

rows, column_indices = np.ogrid[:A.shape[0], :A.shape[1]]

# Use always a negative shift, so that column_indices are valid.
# (could also use module operation)
r[r < 0] += A.shape[1]
column_indices = column_indices - r[:, np.newaxis]

result = A[rows, column_indices]

【讨论】:

  • roll 有效地构造了 column_indicesnp.array([concatenate((arange(n - shift, n), arange(n - shift))) for shift in r])(在 r 被“纠正为负值”之后)。索引是相同的(可能有%=3 更正)。
【解决方案2】:

numpy.lib.stride_tricks.as_strided 再次击中(缩写双关语)!

说到花哨的索引技巧臭名昭著的 - np.lib.stride_tricks.as_strided。想法/技巧是从第一列开始到倒数第二列得到一个切片部分,并在最后连接。这确保我们可以根据需要向前迈进,以利用np.lib.stride_tricks.as_strided,从而避免实际回滚的需要。这就是全部的想法!

现在,就实际实现而言,我们将使用scikit-image's view_as_windows 来优雅地使用np.lib.stride_tricks.as_strided。因此,最终的实现将是 -

from skimage.util.shape import view_as_windows as viewW

def strided_indexing_roll(a, r):
    # Concatenate with sliced to cover all rolls
    a_ext = np.concatenate((a,a[:,:-1]),axis=1)

    # Get sliding windows; use advanced-indexing to select appropriate ones
    n = a.shape[1]
    return viewW(a_ext,(1,n))[np.arange(len(r)), (n-r)%n,0]

这是一个示例运行 -

In [327]: A = np.array([[4, 0, 0],
     ...:               [1, 2, 3],
     ...:               [0, 0, 5]])

In [328]: r = np.array([2, 0, -1])

In [329]: strided_indexing_roll(A, r)
Out[329]: 
array([[0, 0, 4],
       [1, 2, 3],
       [0, 5, 0]])

基准测试

# @seberg's solution
def advindexing_roll(A, r):
    rows, column_indices = np.ogrid[:A.shape[0], :A.shape[1]]    
    r[r < 0] += A.shape[1]
    column_indices = column_indices - r[:,np.newaxis]
    return A[rows, column_indices]

让我们对具有大量行和列的数组进行一些基准测试 -

In [324]: np.random.seed(0)
     ...: a = np.random.rand(10000,1000)
     ...: r = np.random.randint(-1000,1000,(10000))

# @seberg's solution
In [325]: %timeit advindexing_roll(a, r)
10 loops, best of 3: 71.3 ms per loop

#  Solution from this post
In [326]: %timeit strided_indexing_roll(a, r)
10 loops, best of 3: 44 ms per loop

【讨论】:

  • 干得好!不过,值得讨论一下这种方法在记忆中的影响。 scikit-image 在处理超过 2 维的数组时会警告 view_as_windows。
【解决方案3】:

如果您想要更通用的解决方案(处理任何形状和任何轴),我修改了@seberg 的解决方案:

def indep_roll(arr, shifts, axis=1):
    """Apply an independent roll for each dimensions of a single axis.

    Parameters
    ----------
    arr : np.ndarray
        Array of any shape.

    shifts : np.ndarray
        How many shifting to use for each dimension. Shape: `(arr.shape[axis],)`.

    axis : int
        Axis along which elements are shifted. 
    """
    arr = np.swapaxes(arr,axis,-1)
    all_idcs = np.ogrid[[slice(0,n) for n in arr.shape]]

    # Convert to a positive shift
    shifts[shifts < 0] += arr.shape[-1] 
    all_idcs[-1] = all_idcs[-1] - shifts[:, np.newaxis]

    result = arr[tuple(all_idcs)]
    arr = np.swapaxes(result,-1,axis)
    return arr

【讨论】:

    【解决方案4】:

    我实现了一个纯numpy.lib.stride_tricks.as_strided解决方案如下

    from numpy.lib.stride_tricks import as_strided
    
    def custom_roll(arr, r_tup):
        m = np.asarray(r_tup)
        arr_roll = arr[:, [*range(arr.shape[1]),*range(arr.shape[1]-1)]].copy() #need `copy`
        strd_0, strd_1 = arr_roll.strides
        n = arr.shape[1]
        result = as_strided(arr_roll, (*arr.shape, n), (strd_0 ,strd_1, strd_1))
    
        return result[np.arange(arr.shape[0]), (n-m)%n]
    
    A = np.array([[4, 0, 0],
                  [1, 2, 3],
                  [0, 0, 5]])
    
    r = np.array([2, 0, -1])
    
    out = custom_roll(A, r)
    
    Out[789]:
    array([[0, 0, 4],
           [1, 2, 3],
           [0, 5, 0]])
    

    【讨论】:

      【解决方案5】:

      在 divakar 的出色答案的基础上,您可以轻松地将此逻辑应用于 3D 数组(这是首先将我带到这里的问题)。这是一个例子 - 基本上是扁平化你的数据,滚动它并在之后重塑它::

      def applyroll_30(cube, threshold=25, offset=500):
          flattened_cube = cube.copy().reshape(cube.shape[0]*cube.shape[1], cube.shape[2])
      
          roll_matrix = calc_roll_matrix_flattened(flattened_cube, threshold, offset)
      
          rolled_cube = strided_indexing_roll(flattened_cube, roll_matrix, cube_shape=cube.shape)
      
          rolled_cube = triggered_cube.reshape(cube.shape[0], cube.shape[1], cube.shape[2])
          return rolled_cube
      
      
      def calc_roll_matrix_flattened(cube_flattened, threshold, offset):
          """ Calculates the number of position along time axis we need to shift
              elements in order to trig the data.
              We return a 1D numpy array of shape (X*Y, time) elements
          """
      
          # armax(...) finds the position in the cube (3d) where we are above threshold
          roll_matrix = np.argmax(cube_flattened > threshold, axis=1) + offset
          # ensure we don't have index out of bound
          roll_matrix[roll_matrix>cube_flattened.shape[1]] = cube_flattened.shape[1]          
          return roll_matrix
      
      
      
      def strided_indexing_roll(cube_flattened, roll_matrix_flattened, cube_shape):
          # Concatenate with sliced to cover all rolls
          # otherwise we shift in the wrong direction for my application
          roll_matrix_flattened = -1 * roll_matrix_flattened
      
          a_ext = np.concatenate((cube_flattened, cube_flattened[:, :-1]), axis=1)
      
          # Get sliding windows; use advanced-indexing to select appropriate ones
          n = cube_flattened.shape[1]
          result = viewW(a_ext,(1,n))[np.arange(len(roll_matrix_flattened)), (n - roll_matrix_flattened) % n, 0]
          result = result.reshape(cube_shape)
          return result
      

      Divakar 的回答并没有准确说明这对大型数据立方体的效率有多大。我已经对格式化为 int8 的 400x400x2000 数据进行了计时。等效的 for 循环执行 ~5.5 秒,Seberg 的回答 ~3.0 秒和 strided_indexing.... ~0.5 秒。

      【讨论】:

        【解决方案6】:

        通过使用快速傅里叶变换,我们可以在频域中应用变换,然后使用快速傅里叶逆变换来获得行移位。

        所以这是一个纯 numpy 解决方案,只需要一行:

        import numpy as np
        from numpy.fft import fft, ifft
        
        # The row shift function using the fast fourrier transform
        #   rshift(A,r) where A is a 2D array, r the row shift vector
        def rshift(A,r):
             return np.real(ifft(fft(A,axis=1)*np.exp(2*1j*np.pi/A.shape[1]*r[:,None]*np.r_[0:A.shape[1]][None,:]),axis=1).round())
        

        这将应用左移,但我们可以简单地否定指数指数以将函数转换为右移函数:

        ifft(fft(...)*np.exp(-2*1j...)
        

        可以这样使用:

        # Example:
        
        A = np.array([[1,2,3,4],
                      [1,2,3,4],
                      [1,2,3,4]])
        
        r = np.array([1,-1,3])
        
        print(rshift(A,r))
        

        【讨论】:

          猜你喜欢
          • 2021-03-20
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-05-03
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多