【问题标题】:Grouping indices of unique elements in numpynumpy中唯一元素的分组索引
【发布时间】:2014-06-09 17:18:13
【问题描述】:

我有许多包含许多重复项的大型 (>100,000,000) 整数列表。我想获取每个元素出现的索引。目前我正在做这样的事情:

import numpy as np
from collections import defaultdict

a = np.array([1, 2, 6, 4, 2, 3, 2])
d=defaultdict(list)
for i,e in enumerate(a):
    d[e].append(i)

d
defaultdict(<type 'list'>, {1: [0], 2: [1, 4, 6], 3: [5], 4: [3], 6: [2]})

这种遍历每个元素的方法非常耗时。有没有一种有效的或矢量化的方式来做到这一点?

编辑1 我在下面尝试了 Acorbe 和 Jaime 的方法

a = np.random.randint(2000, size=10000000)

结果是

original: 5.01767015457 secs
Acorbe: 6.11163902283 secs
Jaime: 3.79637312889 secs

【问题讨论】:

标签: python python-2.7 numpy


【解决方案1】:

我知道这是一个老问题,但我最近在做一个类似的事情,其中​​性能至关重要,所以我对时间进行了广泛的实验。我希望我的发现对社区有益。

Jaime's solution 基于np.unique 是 Python 中最快的算法,但有一个警告:索引没有排序(因为 numpy 使用 quicksort by default)和结果不同于 OP 的原始算法(以下称为 naive)。使用 stable 选项可以修复它,但它会减慢速度。

使用 Python 的内置 array 模块可以改进幼稚的方法,如下所示:

import array
from collections import defaultdict

a = np.array(...)  # 1D, int array
d = defaultdict(lambda: array.array("L"))
alist = array.array("L")
alist.frombytes(a.tobytes())
for n in range(len(alist)):
    d[alist[n]].append(n)

它只是比 Jaime 的稳定排序解决方案慢几分之一。

这是在我的平台上使用 Python 3 完成的一些测试

Best of 5
Naive method: 0.21274029999999988 s
Naive improved: 0.13265090000000002 s
Unique quick: 0.073496 s
Unique stable: 0.1235801999999997 s

朴素方法、朴素改进和唯一稳定方法的结果是具有排序索引列表的字典。来自唯一快速的索引未排序。

基准代码

import array
import timeit
from collections import defaultdict

import numpy as np

def count_naive(a):
    d = defaultdict(list)
    for n, e in enumerate(a):
        d[e].append(n)
    return dict(d)

def count_improved(a):
    d = defaultdict(lambda: array.array("L"))
    alist = array.array("L")
    alist.frombytes(a.tobytes())
    for n in range(len(alist)):
        d[alist[n]].append(n)
    return {n: indices.tolist() for n, indices in d.items()}

def count_unique(a):
    sorted_idx = np.argsort(a)  # , kind='stable')
    counts = np.bincount(a)
    split_idx = np.split(sorted_idx, np.cumsum(counts[:-1]))
    return {n: indices.tolist() for n, indices in enumerate(split_idx)}

def count_stable(a):
    sorted_idx = np.argsort(a, kind="stable")
    counts = np.bincount(a)
    split_idx = np.split(sorted_idx, np.cumsum(counts[:-1]))
    return {n: indices.tolist() for n, indices in enumerate(split_idx)}

a = np.random.randint(1000, size=1000000)

trials = 5
t_naive = timeit.repeat("count_naive(a)", globals=globals(), repeat=trials, number=1)
t_improved = timeit.repeat("count_improved(a)", globals=globals(), repeat=trials, number=1)
t_unique = timeit.repeat("count_unique(a)", globals=globals(), repeat=trials, number=1)
t_stable = timeit.repeat("count_stable(a)", globals=globals(), repeat=trials, number=1)

print(f"Best of {trials}")
print(f"Naive method: {min(t_naive)} s")
print(f"Naive improved: {min(t_improved)} s")
print(f"Unique quick: {min(t_unique)} s")
print(f"Unique stable: {min(t_stable)} s")

注意所有函数的编写方式都返回Dict[int, list],因此可以直接比较结果。

【讨论】:

    【解决方案2】:
    def to_components(index):
        return np.split(np.argsort(index), np.cumsum(np.unique(index, return_counts=True)[1]))
    

    【讨论】:

    • 看起来最简洁的解决方案,并带有常用的numpy函数
    【解决方案3】:

    简单快速的解决方案。

    a = np.array([0, 0, 0, 1, 1, 3, 3, 3, 2, 2, 2, 0, 0, 1, 4])
    sort_idx = np.argsort(a)
    unique, counts = np.unique(a, return_counts=True)
    b = {key: sort_idx[sum(counts[:key]): sum(counts[:key]) + counts[key]] for key in unique}
    

    【讨论】:

    • 这是一个不错的累积总和:您为什么不简单地使用 numpy.cumsum
    【解决方案4】:

    numpy_indexed 包(免责声明:我是它的作者)实现了一个受 Jaime 启发的解决方案;但有测试、漂亮的界面和许多相关功能:

    import numpy_indexed as npi
    unique, idx_groups = npi.group_by(a, np.arange(len(a))
    

    【讨论】:

      【解决方案5】:

      这与here 提出的问题非常相似,所以接下来是对我在那里的回答的改编。将其矢量化的最简单方法是使用排序。以下代码大量借鉴了即将发布的 1.9 版的 np.unique 的实现,其中包括唯一项目计数功能,请参阅 here

      >>> a = np.array([1, 2, 6, 4, 2, 3, 2])
      >>> sort_idx = np.argsort(a)
      >>> a_sorted = a[idx]
      >>> unq_first = np.concatenate(([True], a_sorted[1:] != a_sorted[:-1]))
      >>> unq_items = a_sorted[unq_first]
      >>> unq_count = np.diff(np.nonzero(unq_first)[0])
      

      现在:

      >>> unq_items
      array([1, 2, 3, 4, 6])
      >>> unq_count
      array([1, 3, 1, 1, 1], dtype=int64)
      

      要获取每个值的位置索引,我们只需这样做:

      >>> unq_idx = np.split(sort_idx, np.cumsum(unq_count))
      >>> unq_idx
      [array([0], dtype=int64), array([1, 4, 6], dtype=int64), array([5], dtype=int64),
       array([3], dtype=int64), array([2], dtype=int64)]
      

      您现在可以构建字典压缩unq_itemsunq_idx

      请注意,unq_count 不计算最后一个唯一项的出现次数,因为拆分索引数组不需要这样做。如果你想拥有你可以做的所有价值观:

      >>> unq_count = np.diff(np.concatenate(np.nonzero(unq_first) + ([a.size],)))
      >>> unq_idx = np.split(sort_idx, np.cumsum(unq_count[:-1]))
      

      【讨论】:

        【解决方案6】:

        这可以通过python pandas(python 数据分析库)和DataFrame.groupby 调用来解决。

        考虑以下

         a = np.array([1, 2, 6, 4, 2, 3, 2])
        
         import pandas as pd
         df = pd.DataFrame({'a':a})
        
         gg = df.groupby(by=df.a)
         gg.groups
        

        输出

         {1: [0], 2: [1, 4, 6], 3: [5], 4: [3], 6: [2]}
        

        【讨论】:

        • 我从来没有用过熊猫。这比纯python版本快吗?
        • @imsc,AFAIK 它基于 numpy 处理数据类型,并实现 cython 和纯 C 方法以提高速度。我经常很乐意将它用于大型数据集(约 1000 万条记录)。
        • 谢谢,我会测试并告诉你。
        • @JoshAdel,这是因为到 np.ndarray 的中间转换是由 pandas 执行的。我猜。
        • 仅供参考:10k 个项目 np.ndarray 的 1k 个不同整数:@Jamie 的纯 numpy 方法最快,@Denis Shcheglov 的方法慢 20%(可能是因为您进行了两次排序(argsort + unique)。pandas 慢 20 倍。随机生成器的种子相同,使用 timeit 测试(numpy 1.16.2,pandas 0.24.2)
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-05-24
        • 2021-02-14
        • 2019-06-26
        • 2011-10-08
        相关资源
        最近更新 更多