【问题标题】:Find cumsum of subarrays split by indices for numpy array efficiently有效地查找由 numpy 数组的索引拆分的子数组的累积和
【发布时间】:2016-04-04 04:01:50
【问题描述】:

给定一个数组'array'和一组索引'indices',我如何找到通过以矢量化方式沿这些索引拆分数组而形成的子数组的累积和? 为了澄清,假设我有:

>>> array = np.arange(20)
>>> array
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19])
indices = np.arrray([3, 8, 14])

操作应该输出:

array([0, 1, 3, 3, 7, 12, 18, 25, 8, 17, 27, 38, 50, 63, 14, 29, 45, 62, 80, 99])

请注意,数组非常大(100000 个元素),所以我需要一个矢量化的答案。使用任何循环都会大大减慢它的速度。 另外,如果我有同样的问题,但是一个二维数组和对应的索引,我需要对数组中的每一行做同样的事情,我该怎么做?

对于 2D 版本:

>>>array = np.arange(12).reshape((3,4))
>>>array
array([[ 0,  1,  2,  3],
       [ 4,  5,  6,  7],
       [ 8,  9, 10, 11]])
>>> indices = np.array([[2], [1, 3], [1, 2]])

输出将是:

array([[ 0,  1,  3,  3],
       [ 4,  9,  6, 13],
       [ 8, 17, 10, 11]])

澄清一下:每一行都会被拆分。

【问题讨论】:

    标签: python arrays performance numpy vectorization


    【解决方案1】:

    您可以在indices 位置引入原始累积求和数组的微分,以在这些位置创建类似边界的效果,这样当微分数组累积求和时,为我们提供indices-stopped 累积求和 em> 输出。乍一看这可能有点做作,但坚持下去,尝试使用其他示例,希望会有意义!这个想法与this other MATLAB solution. 中应用的想法非常相似,因此,遵循这样的理念,这是一种使用numpy.diffcumulative summation 的方法-

    # Get linear indices
    n = array.shape[1]
    lidx = np.hstack(([id*n+np.array(item) for id,item in enumerate(indices)]))
    
    # Get successive differentiations
    diffs = array.cumsum(1).ravel()[lidx] - array.ravel()[lidx]
    
    # Get previous group's offsetted summations for each row at all 
    # indices positions across the entire 2D array
    _,idx = np.unique(lidx/n,return_index=True)
    offsetted_diffs = np.diff(np.append(0,diffs))
    offsetted_diffs[idx] = diffs[idx]
    
    # Get a copy of input array and place previous group's offsetted summations 
    # at indices. Then, do cumulative sum which will create a boundary like 
    # effect with those offsets at indices positions.
    arrayc = array.copy()
    arrayc.ravel()[lidx] -= offsetted_diffs
    out = arrayc.cumsum(1)
    

    这应该是一个几乎矢量化的解决方案,几乎是因为即使我们在循环中计算线性索引,但由于它不是这里的计算密集部分,所以它对总运行时间的影响是最小。此外,如果您不关心破坏输入以节省内存,则可以将 arrayc 替换为 array

    样本输入、输出-

    In [75]: array
    Out[75]: 
    array([[ 0,  1,  2,  3,  4,  5,  6,  7],
           [ 8,  9, 10, 11, 12, 13, 14, 15],
           [16, 17, 18, 19, 20, 21, 22, 23]])
    
    In [76]: indices
    Out[76]: array([[3, 6], [4, 7], [5]], dtype=object)
    
    In [77]: out
    Out[77]: 
    array([[ 0,  1,  3,  3,  7, 12,  6, 13],
           [ 8, 17, 27, 38, 12, 25, 39, 15],
           [16, 33, 51, 70, 90, 21, 43, 66]])
    

    【讨论】:

    • 这对二维数组有用吗?索引会保持不变吗?
    • @Aditya369 在问题中添加 2D 样例和预期输出?
    • 我确实添加了那个。你想让我再添加一个吗?
    • @Aditya369 在您的实际 2D 案例中,indices 中的 2D 数组的形状和元素数量是多少?
    • 300*100000 是数组的形状,每行的索引数在 3000 到 20000 之间,因此总共有 900000 到 6000000 个索引。
    【解决方案2】:

    您可以使用np.split 沿索引拆分数组,然后使用python 内置函数mapnp.cumsum() 应用于您的子数组。最后使用np.hstack 将结果转换为集成数组:

    >>> np.hstack(map(np.cumsum,np.split(array,indices)))
    array([ 0,  1,  3,  3,  7, 12, 18, 25,  8, 17, 27, 38, 50, 63, 14, 29, 45,
           62, 80, 99])
    

    注意,由于map 是python 中的内置函数,并且一直是implemented in C inside the Python interpreter,因此它的性能将优于常规循环。1

    这里是二维数组的替代方案:

    >>> def func(array,indices):
    ...     return np.hstack(map(np.cumsum,np.split(array,indices)))
    ... 
    >>> 
    >>> array
    array([[ 0,  1,  2,  3],
           [ 4,  5,  6,  7],
           [ 8,  9, 10, 11]])
    >>> 
    >>> indices
    array([[2], [1, 3], [1, 2]], dtype=object)
    
    >>> np.array([func(arr,ind) for arr,ind in np.array((array,indices)).T])
    array([[ 0,  1,  2,  5],
           [ 4,  5, 11,  7],
           [ 8,  9, 10, 21]])
    

    请注意,您的预期输出并非基于 np.split 的工作方式。

    如果你想要这样的结果,你需要在你的索引中加 1:

    >>> indices = np.array([[3], [2, 4], [2, 3]], dtype=object)
    >>> 
    >>> np.array([func(arr,ind) for arr,ind in np.array((array,indices)).T])
    array([[  0.,   1.,   3.,   3.],
           [  4.,   9.,   6.,  13.],
           [  8.,  17.,  10.,  11.]])
    

    由于有评论说使用生成器表达式和map 函数之间没有性能差异,我运行了一个基准测试来更好地展示结果。

    # Use map
    ~$ python -m timeit --setup "import numpy as np;array = np.arange(20);indices = np.array([3, 8, 14])" "np.hstack(map(np.cumsum,np.split(array,indices)))"
    10000 loops, best of 3: 72.1 usec per loop
    # Use generator expression
    ~$ python -m timeit --setup "import numpy as np;array = np.arange(20);indices = np.array([3, 8, 14])" "np.hstack(np.cumsum(a) for a in np.split(array,indices))"
    10000 loops, best of 3: 81.2 usec per loop
    

    请注意,这并不意味着使用以 C 速度执行的 map 会使代码以 C 速度执行。正因为如此,代码已经在 python 中实现,调用函数(第一个参数)并将其应用于可迭代项需要时间。

    【讨论】:

    • 这太棒了!!!!这让事情对我来说好多了。有什么方法可以对 2D 矩阵执行此操作吗?
    • @Aditya369 你想如何用indices 数组分割一个二维矩阵?您要拆分所有行还是?
    • 我有一个数组数组,如下所示: indices = np.ndarray([[3, 8, 14], [4, 9, 17]]) 对应于一个有两行的数组。所以 [3, 8, 14] 是第一行,[4, 9, 17] 是第二行。
    • 声称map“以 C 速度执行” 非常具有误导性。 np.hstack(map(np.cumsum,np.split(array,indices))) 和常规 for 循环或生成器表达式之间的性能基本上没有差异,例如np.hstack(np.cumsum(a) for a in np.split(array, indices)).
    • 即使对于具有 100 个索引的 1000000 长数组,我发现使用 map 和生成器表达式之间的运行时间差异约为 13 毫秒。这相当于大约 14% 的加速——几乎没有“Python 速度”和“C 速度”之间的差异。按照你的逻辑,任何只使用 CPython 内置函数的 Python 代码都应该“以 C 速度”运行,这显然是荒谬的。
    猜你喜欢
    • 2011-04-06
    • 2015-11-27
    • 1970-01-01
    • 2017-04-02
    • 1970-01-01
    • 2021-06-29
    • 2013-08-31
    • 1970-01-01
    相关资源
    最近更新 更多