【问题标题】:Multiple substring searches in numpy array with few unique valuesnumpy数组中的多个子字符串搜索具有很少的唯一值
【发布时间】:2020-05-18 08:35:13
【问题描述】:

灵感来自this 问题:假设我有多个一维数组numpyxs 的列表,我想知道有多少作为另一个更大的一维数组numpy 的“子字符串”出现y .

我们可以假设数组包含整数,如果a == b[p:q] 用于某些整数pq,则ab 的子字符串。

我提出的解决方案使用 Python 的 bytes 对象的 in 运算符,但我认为如果 xs 有很多元素,它是低效的:

import numpy as np

N = 10_000    # number of arrays to search
M = 3         # "alphabet" size 
K = 1_000_000 # size of the target array

xs = [np.random.randint(0, M, size=7) for _ in range(N)]
y = np.random.randint(0, M, size=K)

y_bytes = y.tobytes()
%time num_matches = sum(1 for x in xs if x.tobytes() in y_bytes)
# CPU times: user 1.03 s, sys: 17 µs, total: 1.03 s
# Wall time: 1.03 s

如果M 很大(y 中的任何xs 中的可能值的数量)很大,那么我想几乎无法加快速度。但是,对于小的 M 我想使用 trie 或类似的东西可能会有所帮助。有没有一种在 Python 中实现这一点的有效方法,可能使用numpy/numba

【问题讨论】:

  • xs 中的所有数组的长度是否相同?
  • @Divakar 当然,假设xs 中的所有数组都具有相同的长度是可以的。

标签: python-3.x performance numpy substring


【解决方案1】:

对于小的M's,我们可以根据其中的整数组合为每个xs 分配一个唯一的标签。同样,我们可以利用 scaling 数组的卷积,从而将 xs 中的每一个减少为一个标量。最后,我们使用匹配方法来检测和计数。

唯一的开销是从数组列表转换为数组。因此,如果事先优化列表创建本身以拥有一个数组,这将对最终的性能数据有很大帮助。

实现看起来像这样 -

x = np.asarray(xs) # convert to array, if not already done

s = M**np.arange(x.shape[1])
yr = np.convolve(y,s[::-1])
xr = x.dot(s)

# Final step : Match and get count
N = np.maximum(xr.max(),yr.max())+1 # or use s[-1]*M if M is small enough
l = np.zeros(N, dtype=bool)
l[yr] = True
count = l[xr].sum()

替代执行Final step

备选方案#1:

sidx = yr.argsort()
idx = np.searchsorted(yr,xr,sorter=sidx)
idx[idx==len(yr)] = 0
count = (yr[sidx[idx]] == xr).sum()

备选方案#2:

from scipy.sparse import csr_matrix

ly = len(yr)
y_extent = yr.max()+1  # or use s[-1]*M if M is small enough
r = np.zeros(ly, dtype=np.uint64)
val = np.ones(ly, dtype=np.bool)
sp_mat = csr_matrix((val, (r,yr)), shape=(1,y_extent))
count = sp_mat[:,xr].sum()

备选方案#3:

对于更大的M 数字,我们可以使用empty 数组代替-

l = np.empty(N, dtype=bool)
l[xr] = False
l[yr] = True
count = l[xr].sum()

进一步挖掘(利用 numbaconvolution

对主要提议解决方案的分析表明,1D 卷积部分是耗时的部分。更进一步,我们看到1D 卷积码有一个特定的内核,它本质上是几何的。这可以在每次迭代重用边界元素时在O(n) 中实现。请注意,与之前提出的内核相比,这基本上是一个倒置内核。因此,将所有这些更改放在一起,我们最终会得到这样的结果 -

from numba import njit

@njit
def numba1(y, conv_out, M, L, N):
    A = M**L
    for i in range(1,N):
        conv_out[i] = conv_out[i-1]*M + y[i+L-1] - y[i-1]*A
    return conv_out

def numba_convolve(y, M, L):
        N = len(y)-L+1
        conv_out = np.empty(N, dtype=int)
        conv_out[0] = y[:L].dot(M**np.arange(L-1,-1,-1))
        return numba1(y, conv_out, M, L, N)

def intersection_count(xs, y):
    x = np.asarray(xs) # convert to array, if not already done

    L = x.shape[1]
    s = M**np.arange(L-1,-1,-1)
    xr = x.dot(s)

    yr_numba = numba_convolve(y, M=M, L=L)

    # Final step : Match and get count
    N = s[0]*M
    l = np.zeros(N, dtype=bool)
    l[yr_numba] = True
    count = l[xr].sum()
    return count

基准测试

我们将重新使用问题中的设置。

In [42]: %%timeit
    ...: y_bytes = y.tobytes()
    ...: p = sum(1 for x in xs if x.tobytes() in y_bytes)
927 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [43]: %timeit intersection_count(xs, y)
7.55 ms ± 71.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

如前所述,转换为数组可能是瓶颈。所以,让我们也为那部分计时 -

In [44]: %timeit np.asarray(xs)
3.41 ms ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

因此,数组转换部分大约占总运行时间的45%,这是一个相当大的部分。因此,在这一点上,使用 2D 数组而不是 1D 数组列表的建议变得至关重要。好处是数组数据为我们提供了矢量化能力,从而提高了整体性能。只是为了强调 2D 阵列的可用性,这里是有和没有的加速 -

In [45]: 927/7.55
Out[45]: 122.78145695364239

In [46]: 927/(7.55-3.41)
Out[46]: 223.91304347826087

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-11-23
    • 1970-01-01
    • 2014-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-04-02
    相关资源
    最近更新 更多