【问题标题】:Vectorizing min and max of slices possible?可以矢量化切片的最小值和最大值吗?
【发布时间】:2017-09-06 04:42:44
【问题描述】:

假设我有一个 NumPy 整数数组。

arr = np.random.randint(0, 1000, 1000)

我有两个数组lowerupper,分别代表arr 切片的下限和上限。这些间隔是重叠的并且是可变长度的,但是lowersuppers 都保证不会减少。

lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])

我想找到由lowersuppers 定义的arr 的每个切片的最小值和最大值,并将它们存储在另一个数组中。

out_arr = np.empty((lowers.size, 2))

最有效的方法是什么?我担心没有矢量化方法,因为我看不到如何在循环中绕过索引..


我目前的方法很简单

for i in range(lowers.size):
    arr_v = arr[lowers[i]:uppers[i]]
    out_arr[i,0] = np.amin(arr_v)
    out_arr[i,1] = np.amax(arr_v)

这让我得到了想要的结果,比如

In [304]: out_arr
Out[304]: 

array([[  26.,  908.],
       [  18.,  993.],
       [   0.,  968.],
       [   3.,  999.],
       [   1.,  998.],
       [   0.,  994.]])

但这对我的实际数据来说太慢了。

【问题讨论】:

  • 多少个切片,尤其是与arr 的大小相比?好像长短不一?它们可以重叠吗?关于这种迭代的唯一方法是使用accumulatecumsum 例如在某些情况下有效,例如切片和和均值。
  • @hpaulj 是的,它们的长度可变,重叠,对于大约 10^7 个元素的数组,大约有 10^5 个切片。这些也是我从数据库中读取的所有输入,所以我认为在此之前没有任何空间可以“更好地迈出第一步”。
  • np.minimum.reduceat 可能适用。这有点挑剔,因为必须混合上限和下限。同样,最终人们只会将问题减少到具有较小值数组的同​​类问题。但可能仍然值得。
  • @hpaulj 我不太明白accumulate 如何用于数组的这些切片......它不提供运行最大值吗?在使用切片时,我们如何为切片建立左边界?我现在在玩reduceat
  • 这个函数调用了多少次?如果您经常调用它,那么使用 numba 可能是值得的;只需在您直接的解决方案上调用 @jit 即可将我的后 jit 时间从 2.3 秒缩短到 0.08 秒。

标签: python arrays performance numpy


【解决方案1】:

好的,下面是如何使用np.minimum.reduceat 至少缩小原始问题的大小:

lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
for i in range(cut):
    print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]]))

# 33 33
# 7 7
# 5 5
# 0 0
# 3 3
# 7 7

这并没有实现摆脱主循环,但至少数据从 1000 个元素的数组减少到 12 个元素的数组。

更新:

@Eric Hansen 自己的解决方案重叠很小。我仍然想指出,如果存在大量重叠,那么将这两种方法结合起来甚至可能是值得的。我没有numba,所以下面只是一个双通道版本,它结合了我的预处理和 Eric 的纯numpy 解决方案,它也可以作为onepass 形式的基准:

import numpy as np
from timeit import timeit

def twopass(lowers, uppers, arr):
    lu = np.r_[lowers, uppers]
    so = np.argsort(lu)
    iso = np.empty_like(so)
    iso[so] = np.arange(len(so))
    cut = len(lowers)
    lmin = np.minimum.reduceat(arr, lu[so])
    return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2]

def onepass(lowers, uppers, arr):
    mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
    mixture[::2] = lowers; mixture[1::2] = uppers
    return np.minimum.reduceat(arr, mixture)[::2]

arr = np.random.randint(0, 1000, 1000)
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])

print('small')
for f in twopass, onepass:
    print('{:18s} {:9.6f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

arr = np.random.randint(0, 1000, 10**6)
lowers = np.random.randint(0, 8*10**5, 10**4)
uppers = np.random.randint(2*10**5, 10**6, 10**4)
swap = lowers > uppers
lowers[swap], uppers[swap] = uppers[swap], lowers[swap]


print('large')
for f in twopass, onepass:
    print('{:18s} {:10.4f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

示例运行:

small
twopass             0.030880 ms
onepass             0.005723 ms
large
twopass               74.4962 ms
onepass             3153.1575 ms

【讨论】:

  • 感谢保罗,这太棒了。我使用您的 reduceat 建议调整了一个解决方案,并自己添加了一个答案,但我将对此进行测试,如果它具有可比性,我会将您的答案标记为已接受,因为您给了我这个想法。
【解决方案2】:

执行会很慢,因为在循环内部,子数组被复制到数组中,然后执行操作。您可以通过单行代码避免整个循环

out_array = np.array([(np.amin(arr[lowers[i]:uppers[i]]),np.amax(arr[lowers[i]:uppers[i]])) for i in range(lowers.size)])

【讨论】:

    【解决方案3】:

    根据 Paul Panzer 对reduceat 的建议,我最初尝试的改进版本是

    mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
    mixture[::2] = lowers; mixture[1::2] = uppers
    
    np.column_stack((np.minimum.reduceat(arr, mixture)[::2],
                     np.maximum.reduceat(arr, mixture)[::2]))
    

    在与我的实际数据相当的样本量上,这在我的机器上运行时间为 4.22 毫秒,而我的原始解决方案需要 73 毫秒。

    在我原来的解决方案中使用 Numba 会更快

    from numba import jit
    
    @jit
    def get_res():
        out_arr = np.empty((lowers.size, 2))
        for i in range(lowers.size):
            arr_v = arr[lowers[i]:uppers[i]]
            out_arr[i,0] = np.amin(arr_v)
            out_arr[i,1] = np.amax(arr_v)
        return out_arr
    

    在我的机器上运行 100 微秒。

    【讨论】:

    • 如果有人在这里看到任何可恶的错误,请告诉我!
    • 整洁。我总是忘记使用reduceat 可以倒退。只是出于兴趣,因为那个 numba 解决方案非常快。如果你不介意我问。你的输入有多大?是您在其他地方提到的 10^7 / 10^5 吗?您的段的重叠有多大(例如,段总长度超过数据长度)?
    • 从一个间隔到下一个间隔的重叠大约是每个间隔总长度的 80%......我回过头来说,在进一步进行基准测试时,你的 NumPy 方法比我的间隔重叠更快它出现。标记为已接受:)
    • 很高兴与您做生意;-)
    猜你喜欢
    • 2012-08-27
    • 2017-11-18
    • 1970-01-01
    • 2022-11-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-20
    相关资源
    最近更新 更多