【问题标题】:Accumulate sliding windows relative to origin相对于原点累积滑动窗口
【发布时间】:2021-07-02 20:07:46
【问题描述】:

我有一个形状为(3,3) 的数组A,可以将其视为形状为(5,) 的未知数组的滑动窗口视图。我想计算形状为(5,) 的数组的倒数。这个的伴随操作将是求和。我的意思是,我想将每个对应窗口中的值与数组中的相关位置累积起来,形状为(5,)。当然,我对这个反函数的预期输出和输入A 不相关,只是普通数组。我有两个例子,希望能更好地解释这一点。

A = np.array([[0, 0, 1],
              [0, 0, 1],
              [0, 0, 1]], dtype=np.float32)

我期望这个输出:

np.array([0, 0, 1, 1, 1])

另一个例子:

A = np.array([[1, 2, 3],
              [2, 3, 4],
              [3, 4, 5]], dtype=np.float32)

我期望这个输出:

np.array([1, 2+2, 3+3+3, 4+4, 5]) = np.array([1, 4, 9, 8, 5])

我的解决方案很慢(结果存储在out

out = np.zeros(5, dtype=np.float32)
windows = np.lib.stride_tricks.as_strided(out, shape=(3,3), strides=(4,4))
for i in np.ndindex(windows.shape):
  windows[i] += A[i]

写一个跨步视图感觉有点笨拙,我相信有更好的解决方案。

有没有办法以向量化的方式编写它,而不需要 for 循环? (也适用于多个维度)

编辑

就更高维度的泛化而言,我有一些情况是从图像(二维数组)中获取窗口,而不是像上面的示例那样从一维数组中获取。对于二维情况,A 可以是大小为3 的窗口。这意味着从形状为(4,4) 的图像(输出)中,窗口A 的形状为(2,2,3,3)

A = np.array([[[[0, 0, 0],
                [0, 1, 0],
                [0, 0, 0]],

               [[0, 0, 0],
                [1, 0, 0],
                [0, 0, 0]]],


              [[[0, 1, 0],
                [0, 0, 0],
                [0, 0, 0]],

               [[1, 0, 0],
                [0, 0, 0],
                [0, 0, 0]]]], dtype=np.float32)

使用Pablo给出的解决方案,我得到以下错误

value array of shape (2,2,3,3)  could not be broadcast to indexing result of shape (2,2)

使用我的步幅解决方案的略微修改版本:

def inverse_sliding_windows(A, window_sz, image_sz):
  out = np.zeros(image_sz, dtype=np.float32)
  windows = np.lib.stride_tricks.sliding_window_view(out, window_sz, writeable=True)
  for i in np.ndindex(windows.shape):
    windows[i] += A[i]

window_sz = (3,3)
image_sz = (4,4)
inverse_sliding_windows(A, window_sz, image_sz)

输出:

array([[0., 0., 0., 0.],
       [0., 4., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]], dtype=float32)

为了澄清,窗口大小和输出形状是预先知道的,请参阅inverse_sliding_windows

【问题讨论】:

  • 我们是在说,在一般情况下,nxn 数组和输出2*n-1
  • 为简单起见,窗口的尺寸相同。所以是的,所有窗口A 的数组将是一个 nxn 数组。这意味着对于 1d 情况,输出的大小为 2*n-1
  • 对于窗口大小为 3 的 2d 案例,我希望它从形状 (2,2,3,3) 变为形状 (4,4)
  • 这是否相当于将矩阵旋转-45度并逐行求和?
  • 矢量化版本并不总能保证更好的运行时间。矩阵旋转方法非常低效。我确实有一个使用 for 循环的 2D 案例的解决方案,我相信它比矩阵旋转和其他天真的向量化的要快得多。话虽如此,您可能必须决定哪个更重要 - 矢量化或运行时间。如果不需要更高维度的泛化,我会在这里发布。否则,您能否将问题简化为更高维度?

标签: python python-3.x numpy vectorization


【解决方案1】:

扩展@Shihao Xu 给出的fast 解决方案,我尝试通过在numpy/core/src/multiarray 中添加np.fast_compiled 函数将其转换为可编译的c 代码:

NPY_NO_EXPORT PyObject *
arr_fast_compiled(PyObject *NPY_UNUSED(self), PyObject *args)
{
    PyObject *list_obj = NULL;
    PyArrayObject *list_arr = NULL, *ans = NULL;

    npy_intp len, ans_size;
    npy_intp i, j, k;
    double *dans, *numbers;

    if (!PyArg_ParseTuple(args, "O", &list_obj)) {
            goto fail;
    }

    list_arr = (PyArrayObject *)PyArray_ContiguousFromAny(list_obj, NPY_DOUBLE, 2, 2);
    if (list_arr == NULL) {
        goto fail;
    }

    len = PyArray_DIM(list_arr, 0);
    numbers = (double *)PyArray_DATA(list_arr);
    ans_size = 2*len-1;

    ans = (PyArrayObject *)PyArray_ZEROS(1, &ans_size, NPY_DOUBLE, 0);
    if (ans == NULL) {
        goto fail;
    }
    dans = (double *)PyArray_DATA(ans);
    NPY_BEGIN_ALLOW_THREADS;
    for (i = 0; i < len; ++i) {
        k = i * len;
        for (j = i; j < i + len; ++j, ++k) {
            dans[j] += numbers[k];
        }
    }
    NPY_END_ALLOW_THREADS;
    Py_DECREF(list_arr);
    return (PyObject *)ans;

fail:
    Py_XDECREF(list_arr);
    Py_XDECREF(ans);
    return NULL;
}

for循环是最重要的:

for (i = 0; i < len; ++i) {
    k = i * len;
    for (j = i; j < i + len; ++j, ++k) {   
        dans[j] += numbers[k];
    }
}

numbers 是输入参数 (A),我们以跨步方式访问 numbersdans 中的元素。在 3x3 示例中,我们有以下 jk 值:

j = [0, 1, 2, 1, 2, 3, 2, 3, 4]
k = [0, 1, 2, 3, 4, 5, 6, 7, 8]

NPY_BEGIN_ALLOW_THREADS 是我看到经常用于其他 numpy 函数的东西,但是当我在没有它的情况下测试它时似乎没有性能差异。

性能类似于just_sum_0

【讨论】:

    【解决方案2】:

    正如我在评论中提到的,矢量化解决方案并不总能保证更好的运行时间。如果您的矩阵很大,您可能更喜欢更有效的方法。矩阵旋转缓慢的原因有几个(虽然很直观),请参阅评论。

    性能对比:

    Solution: Wall time: 61.6 ms
    Rotation: Wall time: 3.32 s
    

    代码(在 jupyter notebook 中测试)

    import numpy as np
    
    def rotate45_and_sum(A):
        n = len(A) 
        x, y = np.meshgrid(np.arange(n), np.arange(n))  # at least doubled the running time
        xn, yn = x + y, n - x + y - 1   # generating xn and yn at least doubled the running time
        M = np.zeros((2*n -1, 2*n -1))  # at least slows down running time by a factor of 4
        M[xn,yn] = A[x,y] # very inefficient indexing strategy
        return M.sum(1)
    
    def solution(A):
        n = A.shape[0]
        retval = np.zeros(2*n-1)
        for i in range(n):
            retval[i:(i+n)] += A[i, :]
        return retval
    
    A = np.random.randn(10000, 10000)
    
    %time solution(A)
    
    %time rotate45_and_sum(A)
    

    在多维情况下:

    def solution(A):
        h,w,x,y = A.shape                # change here
        retval = np.zeros((2*x-w,2*y-h)) # change here
        indices = np.ndindex(w, h)       # change here
        for index in indices:
            slices = tuple()
            for i in range(len(index)):
                slices = slices + (slice(index[i], index[i]+x),) # I assume x = y = ..., you need to change here also if the assumption is not correct
            retval[slices] += A[index] # slices is roughly equal `i:(i+x), j:(j+y)` in your code
        return retval
    

    实际上,我不知道如何根据您的描述计算尺寸(或形状):(。但我认为它可以概括。想法是随时构建slices。所以你需要指定哪些维度对应h, w,哪些对应x, y。我认为这样做并不难。

    参考:Numpy index array of unknown dimensions?


    关于https://stackoverflow.com/a/67341994/14923227

    
    def fast(A):
        n = A.shape[0]
        retval = np.zeros(2*n-1)
        for i in range(n):
            retval[i:(i+n)] += A[i, :]
        print(retval.sum())
        return retval
    
    ##########################
    import threading
    
    class sumThread(threading.Thread):
        def __init__(self, A, mat, threadID, ngroups, size):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.size = size
            self.ngroups = ngroups
            self.mat = mat
            self.A = A
        def run(self):
            begin = (self.size + self.ngroups) // self.ngroups * self.threadID
            end   = min(self.size, (self.size+self.ngroups)//self.ngroups*(self.threadID+1))
            for i in range(begin, end):
                self.mat[self.threadID, i:(i+self.size)] += self.A[i, :]
    
    def faster(A):
        
        num_threads = max(1, A.shape[0] // 4000) 
        mat = np.zeros((num_threads, 2*A.shape[0]-1))
        threads = []
        for i in range(num_threads):
            t = sumThread(A, mat, i, num_threads, A.shape[0])
            t.start()
            threads.append(t)
    
        # Wait for all threads to complete
        for t in threads:
            t.join()
        return np.sum(mat, axis=0)
        
    
    

    大型阵列的性能:

    A = np.random.randn(20000,20000)
    %timeit fast(A)   # 263 ms ± 5.21 ms per loop 
    %timeit faster(A) # 155 ms ± 3.14 ms per loop
    

    fast 中并行化for 循环很简单。但是fast 实际上是最高效的缓存(即使对于 GPU 缓存和内存库),因此是计算它的最快方法。理想情况下,您可以使用 CUDA/OpenCL 并行化代码,因为 GPU 中有更多内核。如果操作正确,运行时间将减少到log(original_fast_time),基数为k,其中k 是您拥有的核心数。

    但是,函数中只有少量计算。因此,内存和 GRAM 之间的数据传输可能占主导地位。 (我没有测试过)

    【讨论】:

    • 谢谢!我不知道编写非矢量化会更快。最终,我不在乎解决方案是否矢量化 - 只有性能和内存消耗很重要。
    • 不客气。许多 numpy 的函数只是语法糖。它有助于考虑它是如何在幕后完成的(由执行实际工作的 C 函数)。有时,适当的矢量化解决方案可以加快计算速度,但不清楚您编写的代码是否足够“合适”。无论哪种情况,分析都可能是您可以依赖的唯一工具。
    • 这是我发布的另一个非常相似的问题的有趣运行时分析:stackoverflow.com/a/67341994/14923227
    • @Kevin 我在这里更新了我的答案。这很简单,但如果您需要更快的速度,它可能会有所帮助。
    • 我在此处发布了使用 fast 解决方案的编译版本的答案。是否可以使用并行 GPU 解决方案来加速它? (忽略 CPU 和 GPU 之间的数据传输开销)
    【解决方案3】:

    IIUC 这里提出的问题相当于将矩阵A 旋转-45 度并逐行求和(至少对于2D 版本)。为了更好地理解我所说的旋转矩阵的含义,请参阅this post

    def rotate45_and_sum(A):
        n = len(A) 
        x, y = np.meshgrid(np.arange(n), np.arange(n)) 
        xn, yn = x + y, n - x + y - 1
        M = np.zeros((2*n -1, 2*n -1)) 
        M[xn,yn] = A[x,y] 
        return M.sum(1)
    
    A = np.array([[0, 0, 1],
                  [0, 0, 1],
                  [0, 0, 1]], dtype=np.float32)
    
    print(rotate45_and_sum(A))
    #[0. 0. 1. 1. 1.]
    
    A = np.array([[1, 2, 3],
                  [2, 3, 4],
                  [3, 4, 5]], dtype=np.float32)
    
    print(rotate45_and_sum(A))
    #[1. 4. 9. 8. 5.]
    

    M 是旋转后的矩阵。

    免责声明:我不知道这是否可以推广到多个维度

    【讨论】:

    • 这真是太好了,谢谢。我对我的帖子进行了编辑,您认为可以为 4 维的 A 做同样的事情吗?
    猜你喜欢
    • 2021-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-10
    • 2019-07-01
    • 1970-01-01
    • 2015-08-26
    • 2019-07-30
    相关资源
    最近更新 更多