【问题标题】:Faster way to find same integers in sequence in numpy array在 numpy 数组中按顺序查找相同整数的更快方法
【发布时间】:2020-05-26 15:00:30
【问题描述】:

现在我只是循环使用np.nditer() 并与前一个元素进行比较。有没有更快的(矢量化)方法?

额外的好处是我不必总是走到数组的末尾;只要找到max_len 的序列,我就完成了搜索。

import numpy as np

max_len = 3
streak = 0
prev = np.nan

a = np.array([0, 3, 4, 3, 0, 2, 2, 2, 0, 2, 1])

for c in np.nditer(a):
  if c == prev:
      streak += 1
      if streak == max_len:
          print(c)
          break
  else:
      prev = c
      streak = 1

我想到的替代方案是使用np.diff(),但这只是转移了问题;我们现在正在其结果中寻找一系列零。我也怀疑它会更快,因为它必须计算每个整数的差异,而实际上,序列会在到达列表末尾之前发生。

【问题讨论】:

  • 您对最终/最终预期输出感兴趣的变量是什么?
  • 数组要么包含序列,要么不包含。我有兴趣知道一个序列是否发生,如果发生,它是什么整数。

标签: python arrays numpy loops


【解决方案1】:

您可以使用itertools 包中的groupby

import numpy as np
from itertools import groupby

max_len = 3
best = ()

a = np.array([0, 3, 4, 3, 0, 2, 2, 2, 0, 2, 1])

for k, g in groupby(a):
    tup_g = tuple(g)
    if tup_g==max_len:
        best = tup_g
        break
    if len(tup_g) > len(best):
        best = tup_g

best
# returns:
(2, 2, 2)

【讨论】:

    【解决方案2】:

    假设您正在寻找至少连续出现max_len 次的元素,这是一种基于 NumPy 的方法 -

    m = np.r_[True,a[:-1]!=a[1:],True]
    idx0 = np.flatnonzero(m)
    m2 = np.diff(idx0)>=max_len
    out = None # None for no such streak found case
    if m2.any():
        out = a[idx0[m2.argmax()]]
    

    另一个binary-dilation -

    from scipy.ndimage.morphology import binary_erosion
    
    m = np.r_[False,a[:-1]==a[1:]]
    m2 = binary_erosion(m, np.ones(max_len-1, dtype=bool))
    out = None
    if m2.any():
        out = a[m2.argmax()]
    

    最后,为了完整起见,您还可以查看numba。您现有的代码将按原样工作,直接循环a,即for c in a:

    【讨论】:

      【解决方案3】:

      您可以创建长度为max_length 的子数组,每次向右移动一个位置(如ngrams),并检查一个子数组的总和除以max_length 是否等于第一个元素那个子数组。

      如果这是真的,那么您已经找到长度为max_length 的连续整数序列。

      def get_conseq(array, max_length):
          sub_arrays = zip(*[array[i:] for i in range(max_length)])
          for e in sub_arrays:
              if sum(e) / len(e) == e[0]:
                  print("Found : {}".format(e))
                  return e
          print("Nothing found")
          return []
      

      例如,这个数组[1,2,2,3,4,5]max_length = 2 将像这样被“拆分”: [1,2] [2,2] [2,3] [3,4] [4,5]

      在第二个元素 [2,2] 上,总和为 4,除以 max_length 得到 2,与该子组的第一个元素匹配,然后函数返回。

      如果你愿意这样做,你可以break,而不是像我一样返回。

      您还可以添加一些规则来捕获边缘情况,以使事情变得干净(空数组、max_length 优于数组的长度等)。

      以下是一些示例调用:

      >>> splits([1,2,3,4,5,6], 2)
      Nothing found
      
      >>> splits([1,2,2,3,4,5,6], 3)
      Nothing found
      
      >>> splits([1,2,3,3,3], 3)
      Found : [3, 3, 3]
      
      >>> splits([1,2,2,3,3], 2)
      Found : [2, 2]
      

      希望这会有所帮助!

      【讨论】:

        【解决方案4】:

        我开发了一个numpy-only 可以工作的版本,但是经过测试,我发现它的性能很差,因为它不能利用short-circuiting。既然这是你要求的,我在下面描述它。但是,使用numba 和稍微修改过的代码版本有一个好多 更好的方法。 (请注意,所有这些都返回a 中第一个匹配项的索引,而不是值本身。我发现这种方法更灵活。)

        @numba.jit(nopython=True)
        def find_reps_numba(a, max_len):
            streak = 1
            val = a[0]
            for i in range(1, len(a)):
                if a[i] == val:
                    streak += 1
                    if streak >= max_len:
                        return i - max_len + 1
                else:
                    streak = 1
                    val = a[i]
            return -1
        

        事实证明,这比纯 Python 版本快约 100 倍。

        numpy 版本使用rolling window trickargmax trick。但同样,这甚至比纯 Python 版本慢得多,大约 30 倍。

        def rolling_window(a, window):
            a = numpy.ascontiguousarray(a)  # This approach requires a C-ordered array
            shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
            strides = a.strides + (a.strides[-1],)
            return numpy.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
        
        def find_reps_numpy(a, max_len):
            windows = rolling_window(a, max_len)
            return (windows == windows[:, 0:1]).sum(axis=1).argmax()
        

        我针对第一个函数的非 jitted 版本测试了这两个函数。 (我使用 Jupyter 的 %%timeit 功能进行测试。)

        a = numpy.random.randint(0, 100, 1000000)
        
        %%timeit
        find_reps_numpy(a, 3)
        28.6 ms ± 553 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
        
        %%timeit
        find_reps_orig(a, 3)
        4.04 ms ± 40.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
        
        %%timeit
        find_reps_numba(a, 3)
        8.29 µs ± 89.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
        

        请注意,这些数字可能会有很大差异,具体取决于函数必须搜索的a 的深度。为了更好地估计预期性能,我们可以每次都重新生成一组新的随机数,但是如果不将那一步包含在时序中,就很难做到这一点。因此,为了在这里进行比较,我将生成随机数组所需的时间包括在内而不运行其他任何东西:

        a = numpy.random.randint(0, 100, 1000000)
        9.91 ms ± 129 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
        
        a = numpy.random.randint(0, 100, 1000000)
        find_reps_numpy(a, 3)
        38.2 ms ± 453 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
        
        a = numpy.random.randint(0, 100, 1000000)
        find_reps_orig(a, 3)
        13.7 ms ± 404 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
        
        a = numpy.random.randint(0, 100, 1000000)
        find_reps_numba(a, 3)
        9.87 ms ± 124 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
        

        如您所见,find_reps_numba 速度如此之快,以至于运行numpy.random.randint(0, 100, 1000000) 所需的时间差异要大得多——因此第一次和最后一次测试之间的加速是虚幻的。

        所以这个故事的主要寓意是numpy 解决方案并不总是最好的。有时甚至纯 Python 也更快。在这些情况下,nopython 模式下的numba 可能是迄今为止最好的选择。

        【讨论】:

        • 很好的答案,谢谢。确认很难找到更快的方法来做到这一点。另外IPython中%%timeit的技巧也不错,传统的使用timeit的方法太繁琐了!
        • 我真的应该开始使用perfplot,但是timeit 太有效地吸引了我懒惰的一面。
        猜你喜欢
        • 1970-01-01
        • 2018-07-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-05
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多