【问题标题】:numpy create array of the max of consecutive pairs in another arraynumpy 在另一个数组中创建最大连续对的数组
【发布时间】:2020-12-31 22:16:55
【问题描述】:

我有一个 numpy 数组:

A = np.array([8, 2, 33, 4, 3, 6])

我想要创建另一个数组 B,其中每个元素是 A 中 2 个连续对的成对最大值,所以我得到:

B = np.array([8, 33, 33, 4, 6])

关于如何实施的任何想法?
关于如何为超过 2 个元素实现此功能的任何想法? (同样的事情,但对于连续的 n 个元素)

编辑:

答案给了我解决这个问题的方法,但是对于 n 大小窗口的情况,有没有更有效的方法不需要循环?

编辑2:

事实证明,这个问题等同于询问如何对具有大小为 n 的窗口的列表执行 1d 最大池化。 有谁知道如何有效地实现这一点?

【问题讨论】:

    标签: python numpy max-pooling


    【解决方案1】:

    在这个问答中,我们基本上要求滑动最大值。之前已经对此进行了探索 - Max in a sliding window in NumPy array。由于我们希望提高效率,因此我们可以看得更远。其中之一是numba,这是我最终得到的两个最终变体,利用parallel 指令可以提高性能而不是无版本:

    import numpy as np
    from numba import njit, prange
    
    @njit(parallel=True)
    def numba1(a, W):
        L = len(a)-W+1
        out = np.empty(L, dtype=a.dtype)
        v = np.iinfo(a.dtype).min
        for i in prange(L):
            max1 = v
            for j in range(W):
                cur = a[i + j]
                if cur>max1:
                    max1 = cur                
            out[i] = max1
        return out 
    
    @njit(parallel=True)
    def numba2(a, W):
        L = len(a)-W+1
        out = np.empty(L, dtype=a.dtype)
        for i in prange(L):
            for j in range(W):
                cur = a[i + j]
                if cur>out[i]:
                    out[i] = cur                
        return out 
    

    从之前链接的问答中,等效的 SciPy 版本将是 -

    from scipy.ndimage.filters import maximum_filter1d
    
    def scipy_max_filter1d(a, W):
        L = len(a)-W+1
        hW = W//2 # Half window size
        return maximum_filter1d(a,size=W)[hW:hW+L]
    

    基准测试

    其他已发布的通用窗口 arg 工作方法:

    from skimage.util import view_as_windows
    
    def rolling(a, window):
        shape = (a.size - window + 1, window)
        strides = (a.itemsize, a.itemsize)
        return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
    
    # @mathfux's soln
    def npmax_strided(a,n):
        return np.max(rolling(a, n), axis=1)
    
    # @Nicolas Gervais's soln
    def mapmax_strided(a, W):
        return list(map(max, view_as_windows(a,W)))
    
    cummax = np.maximum.accumulate
    def pp(a,w):
        N = a.size//w
        if a.size-w+1 > N*w:
            out = np.empty(a.size-w+1,a.dtype)
            out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
            out[-1] = a[w*N:].max()
        else:
            out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
        out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                                cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
        out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
        return out
    

    使用benchit 包(几个基准测试工具打包在一起;免责声明:我是它的作者)对建议的解决方案进行基准测试。

    import benchit
    funcs = [mapmax_strided, npmax_strided, numba1, numba2, scipy_max_filter1d, pp]
    in_ = {(n,W):(np.random.randint(0,100,n),W) for n in 10**np.arange(2,6) for W in [2, 10, 20, 50, 100]}
    t = benchit.timings(funcs, in_, multivar=True, input_name=['Array-length', 'Window-length'])
    t.plot(logx=True, sp_ncols=1, save='timings.png')
    

    因此,numba 非常适合小于 10 的窗口大小,在这种情况下没有明显的赢家,而在较大的窗口大小上,pp 以 SciPy 赢得第二名。

    【讨论】:

    • 对于小型阵列 len
    【解决方案2】:

    使用Pandas:

    A = pd.Series([8, 2, 33, 4, 3, 6])
    res = pd.concat([A,A.shift(-1)],axis=1).max(axis=1,skipna=False).dropna()
    
    >>res
    0     8.0
    1    33.0
    2    33.0
    3     4.0
    4     6.0
    

    或者使用 numpy:

    np.vstack([A[1:],A[:-1]]).max(axis=0)
    

    【讨论】:

      【解决方案3】:

      一个递归解决方案,适用于所有 n

      import numpy as np
      import sys
      
      
      def recursive(a: np.ndarray, n: int, b=None, level=2):
          if n <= 0 or n > len(a):
              raise ValueError(f'len(a):{len(a)} n:{n}')
          if n == 1:
              return a
          if len(a) == n:
              return np.max(a)
          b = np.maximum(a[:-1], a[1:]) if b is None else np.maximum(a[level - 1:], b)
          if n == level:
              return b
          return recursive(a, n, b[:-1], level + 1)
      
      
      test_data = np.array([8, 2, 33, 4, 3, 6])
      for test_n in range(1, len(test_data) + 2):
          try:
              print(recursive(test_data, n=test_n))
          except ValueError as e:
              sys.stderr.write(str(e))
      

      输出

      [ 8  2 33  4  3  6]
      [ 8 33 33  4  6]
      [33 33 33  6]
      [33 33 33]
      [33 33]
      33
      len(a):6 n:7
      

      关于递归函数

      你可以观察下面的数据,然后你就会知道如何编写递归函数了。

      """
      np.array([8, 2, 33, 4, 3, 6])
      n=2: (8, 2),     (2, 33),    (33, 4),    (4, 3),   (3, 6)  => [8, 33, 33, 4, 6] => B' = [8, 33, 33, 4]
      n=3: (8, 2, 33), (2, 33, 4), (33, 4, 3), (4, 3, 6)         => B' [33, 4, 3, 6]  =>  np.maximum([8, 33, 33, 4], [33, 4, 3, 6]) => 33, 33, 33, 6
      ...
      """
      

      【讨论】:

        【解决方案4】:

        这是一种专门针对较大窗户设计的方法。窗口大小为 O(1),数据大小为 O(n)。

        我已经完成了一个纯 numpy 和一个 pythran 实现。

        我们如何在窗口大小上实现 O(1)?我们使用“锯齿”技巧:如果 w 是窗口宽度,我们将数据分组到许多 w 中,并且对于每个组,我们从左到右和从右到左进行累积最大值。任何中间窗口的元素分布在两组中,并且交叉点的最大值在我们之前计算的累积最大值中。所以我们需要对每个数据点进行总共 3 次比较。

        benchit(感谢@Divakar)w=100;我的函数是 pp (numpy) 和 winmax (pythran):

        对于小窗口尺寸 w=5,图片更均匀。有趣的是,即使对于非常小的尺寸,pythran 仍然具有巨大的优势。他们必须做一些正确的事情来最小化调用开销。

        python 代码:

        cummax = np.maximum.accumulate
        def pp(a,w):
            N = a.size//w
            if a.size-w+1 > N*w:
                out = np.empty(a.size-w+1,a.dtype)
                out[:-1] = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-1:-1]
                out[-1] = a[w*N:].max()
            else:
                out = cummax(a[w*N-1::-1].reshape(N,w),axis=1).ravel()[:w-a.size-2:-1]
            out[1:N*w-w+1] = np.maximum(out[1:N*w-w+1],
                                    cummax(a[w:w*N].reshape(N-1,w),axis=1).ravel())
            out[N*w-w+1:] = np.maximum(out[N*w-w+1:],cummax(a[N*w:]))
            return out
        

        pythran 版本;用pythran -O3 &lt;filename.py&gt;编译;这将创建一个可以导入的已编译模块:

        import numpy as np
        
        # pythran export winmax(float[:],int)
        # pythran export winmax(int[:],int)
        
        def winmax(data,winsz):
            N = data.size//winsz
            if N < 1:
                raise ValueError
            out = np.empty(data.size-winsz+1,data.dtype)
            nxt = winsz
            for j in range(winsz,data.size):
                if j == nxt:
                    nxt += winsz
                    out[j+1-winsz] = data[j]
                else:
                    out[j+1-winsz] = out[j-winsz] if out[j-winsz]>data[j] else data[j]
            running = data[-winsz:N*winsz].max()
            nxt -= winsz << (nxt > data.size)
            for j in range(data.size-winsz,0,-1):
                if j == nxt:
                    nxt -= winsz
                    running = data[j-1]
                else:
                    running = data[j] if data[j] > running else running
                    out[j] = out[j] if out[j] > running else running
            out[0] = data[0] if data[0] > running else running
            return out
        

        【讨论】:

        • 对于仅 NumPy 的版本来说还不错。
        【解决方案5】:

        无循环的解决方案是在skimage.util.view_as_windows创建的窗口上使用max

        list(map(max, view_as_windows(A, (2,))))
        
        [8, 33, 33, 4, 6]
        

        复制/粘贴示例:

        import numpy as np
        from skimage.util import view_as_windows
        
        A = np.array([8, 2, 33, 4, 3, 6])
        
        list(map(max, view_as_windows(A, (2,))))
        

        【讨论】:

        • 这是skimage 的一个非常好的功能。我也很想知道它在numpy 方法方面是否比O(len(A)*n) 更好。
        • 是的,那会很有趣。你为什么不试试呢
        • 我实际上是使用np.minimum.accumulate 完成的。它帮助我获得了一些有趣的见解。例如,我能够以矢量化方式和线性时间列出检查 A[:k] 是否在 A[:k-n] 中具有最小值(其中 n 是窗口的固定大小,k 是可迭代的)。但这还不足以进一步解决问题。
        【解决方案6】:

        如果有连续的n项目,扩展解决方案需要循环:

        np.maximum(*[A[i:len(A)-n+i+1] for i in range(n)])
        

        为了避免这种情况,您可以使用stride tricks 并将A 转换为n-length 块数组:

        def rolling(a, window):
            shape = (a.size - window + 1, window)
            strides = (a.itemsize, a.itemsize)
            return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
        

        例如:

        >>> rolling(A, 3)
        array([[ 8,  2,  8],
           [ 2,  8, 33],
           [ 8, 33, 33],
           [33, 33,  4]])
        

        完成后你可以用np.max(rolling(A, n), axis=1)杀死它。

        尽管它很优雅,无论是这个解决方案还是第一个解决方案都没有效率,因为我们重复地对仅相差两项的相邻块应用最大值。

        【讨论】:

        • 关于如何使这个解决方案无循环且高效的任何建议?也许某种矢量化可以提供帮助?
        • 锻炼需要更多的努力。像np.minimum.accumulate 这样的东西可能会有所帮助。
        • @GalSuchetzky 看来我们需要更深入地了解在连续块中查找min 的算法——这是我自己或互联网上找不到的东西。如果我们在一般情况下考虑这个问题,这是一个非常有趣的问题,所以我认为值得请教专家并在打开此选项时开始对这个问题进行赏金。
        【解决方案7】:

        成对问题的一种解决方案是使用np.maximum 函数和数组切片:

        B = np.maximum(A[:-1], A[1:])
        

        【讨论】:

          猜你喜欢
          • 2021-10-26
          • 2017-12-27
          • 2023-03-22
          • 1970-01-01
          • 2023-02-21
          • 2012-04-04
          • 1970-01-01
          • 2015-07-05
          相关资源
          最近更新 更多