【问题标题】:combine two arrays and sort合并两个数组并排序
【发布时间】:2012-09-14 15:04:53
【问题描述】:

给定两个排序数组,如下所示:

a = array([1,2,4,5,6,8,9])

b = array([3,4,7,10])

我希望输出是:

c = array([1,2,3,4,5,6,7,8,9,10])

或:

c = array([1,2,3,4,4,5,6,7,8,9,10])

我知道我可以执行以下操作:

c = unique(concatenate((a,b))

我只是想知道是否有更快的方法来做到这一点,因为我正在处理的数组有数百万个元素。

欢迎任何想法。谢谢

【问题讨论】:

  • 我真的怀疑如果不编写一个已编译的扩展来组合 concatenate uniquesort,你会做得更好。
  • 您至少可以删除sort,因为unique 的输出保证已经排序。

标签: python numpy


【解决方案1】:

由于您使用 numpy,我怀疑 bisec 是否对您有帮助...因此,我建议做两件事:

  1. 不要使用np.sort,而是使用c.sort() 方法来对数组进行就地排序并避免复制。
  2. np.unique 必须使用np.sort,这是没有到位的。因此,不要使用np.unique 手动执行逻辑。 IE。首先排序(就地)然后手动执行np.unique 方法(还要检查它的python 代码),使用flag = np.concatenate(([True], ar[1:] != ar[:-1])) unique = ar[flag](对ar 进行排序)。为了更好一点,您可能应该自行进行标志操作,即。 flag = np.ones(len(ar), dtype=bool) 然后np.not_equal(ar[1:], ar[:-1], out=flag[1:]) 基本上避免了flag 的完整副本。
  3. 对此我不确定。但是.sort 有 3 种不同的算法,因为您的数组可能几乎已经排序,更改排序方法可能会产生速度差异。

这将使完整的东西接近你所得到的(无需事先做一个独特的):

def insort(a, b, kind='mergesort'):
    # took mergesort as it seemed a tiny bit faster for my sorted large array try.
    c = np.concatenate((a, b)) # we still need to do this unfortunatly.
    c.sort(kind=kind)
    flag = np.ones(len(c), dtype=bool)
    np.not_equal(c[1:], c[:-1], out=flag[1:])
    return c[flag]

【讨论】:

  • 如果您将这些建议整合到一个函数中,我很乐意将其添加到我在答案中发布的 timeit 框架中。
  • @mgilson 添加了它,但如果你想比较它,请将它与大数组进行比较...小可能不适合这个目的,对于这个应用程序来说,开销太小了.
  • 已测试——目前您在数组大小为 10000 方面处于领先地位。
  • 是的,你的速度是迄今为止最快的
  • 我不得不说——这个答案非常了不起。 (正如我对原始问题的评论中所记录的)我认为您无法对 OP 发布的代码进行很大改进。做得好。 (希望其他人会看到这一点并无偿投票)。
【解决方案2】:

sortednp 包实现了排序后的 numpy 数组的高效合并,只是对值进行排序,而不是使它们唯一:

import numpy as np
import sortednp
a = np.array([1,2,4,5,6,8,9])
b = np.array([3,4,7,10])
c = sortednp.merge(a, b)

我测量了时间并在 this answer to a similar post 中进行了比较,它的性能优于 numpy 的归并排序 (v1.17.4)。

【讨论】:

    【解决方案3】:

    如果您对时间安排感到好奇,最好只关注timeit。下面,我列出了各种方法的一个子集及其时间:

    import numpy as np
    import timeit
    import heapq
    
    
    
    def insort(a, x, lo=0, hi=None):
        if hi is None: hi = len(a)
        while lo < hi:
            mid = (lo+hi)//2
            if x < a[mid]: hi = mid
            else: lo = mid+1
        return lo, np.insert(a, lo, [x])
    
    size=10000
    a = np.array(range(size))
    b = np.array(range(size))
    
    def op(a,b):
        return np.unique(np.concatenate((a,b)))
    
    def martijn(a,b):
        c = np.copy(a)
        lo = 0
        for i in b:
            lo, c = insort(c, i, lo)
        return c
    
    def martijn2(a,b):
        c = np.zeros(len(a) + len(b), a.dtype)
        for i, v in enumerate(heapq.merge(a, b)):
            c[i] = v
    
    def larsmans(a,b):
        return np.array(sorted(set(a) | set(b)))
    
    def larsmans_mod(a,b):
        return np.array(set.union(set(a),b))
    
    
    def sebastian(a, b, kind='mergesort'):
        # took mergesort as it seemed a tiny bit faster for my sorted large array try.
        c = np.concatenate((a, b)) # we still need to do this unfortunatly.
        c.sort(kind=kind)
        flag = np.ones(len(c), dtype=bool)
        np.not_equal(c[1:], c[:-1], out=flag[1:])
        return c[flag]
    

    结果:

    martijn2     25.1079499722
    OP       1.44831800461
    larsmans     9.91507601738
    larsmans_mod     5.87612199783
    sebastian    3.50475311279e-05
    

    我在这里的具体贡献是 larsmans_mod,它避免创建 2 个集合 - 它只创建 1 个,这样做可以将执行时间减少近一半。

    EDIT 删除了 martijn,因为它太慢而无法竞争。还测试了稍大的数组(排序)输入。我也没有测试输出的正确性......

    【讨论】:

    • 当然,如果我们从更大的数组开始,我很想看看它是如何扩展的......
    • 有趣的是,c 数组的不断重新创建显然让我很生气。
    • @MartijnPieters -- 你最近的尝试更好,但仍然没有完全击败 OP。
    • 啊,那是因为我自己从来没有认真使用过 numpy。
    【解决方案4】:

    将元素插入array 的中间是一个非常低效的操作,因为它们在内存中是平坦的,因此每当您插入另一个元素时,您都需要移动所有内容。因此,您可能不想使用bisect。这样做的复杂性大约是O(N^2)

    您当前的方法是O(n*log(n)),这样会好很多,但并不完美。

    将所有元素插入哈希表(例如set)是一件事情。这将花费O(N) 时间来进行唯一化,但随后您需要对将花费O(n*log(n)) 的时间进行排序。还是不太好。

    真正的O(N) 解决方案涉及分配一个数组,然后通过获取输入列表的最小头部来一次填充一个元素,即。合并。不幸的是,numpy 和 Python 似乎都没有这样的东西。解决方案可能是在 Cython 中编写一个。

    它看起来像下面这样:

    def foo(numpy.ndarray[int, ndim=1] out,
            numpy.ndarray[int, ndim=1] in1, 
            numpy.ndarray[int, ndim=1] in2):
    
            cdef int i = 0
            cdef int j = 0
            cdef int k = 0
            while (i!=len(in1)) or (j!=len(in2)):
                # set out[k] to smaller of in[i] or in[j]
                # increment k
                # increment one of i or j
    

    【讨论】:

    • 同意,像这样编写一个小的 Cython 扩展是一个不错的选择。
    • 只要您对预排序输入的假设是正确的,效率就会大大提高。看来 OP 打算这样做。
    • 您可以随时检查您的输入是否已排序,这只需要O(n) 时间,因此不会增加复杂性。
    【解决方案5】:

    您可以使用 bisect module 进行此类合并,将第二个 python 列表合并到第一个。

    bisect* 函数适用于 numpy 数组,但 insort* 函数不适用。使用module source code 来调整算法很容易,非常基础:

    from numpy import array, copy, insert
    
    def insort(a, x, lo=0, hi=None):
        if hi is None: hi = len(a)
        while lo < hi:
            mid = (lo+hi)//2
            if x < a[mid]: hi = mid
            else: lo = mid+1
        return lo, insert(a, lo, [x])
    
    a = array([1,2,4,5,6,8,9])
    b = array([3,4,7,10])
    
    c = copy(a)
    lo = 0
    for i in b:
        lo, c = insort(c, i, lo)
    

    并不是说自定义insort 真的在这里添加任何东西,默认bisect.bisect 也可以正常工作:

    import bisect
    
    c = copy(a)
    lo = 0
    for i in b:
        lo = bisect.bisect(c, i)
        c = insert(c, i, lo)
    

    使用这个改编的insort 比合并和排序更有效。因为b 也已排序,所以我们可以跟踪lo 插入点并从那里开始搜索下一个点,而不是每次循环都考虑整个数组。

    如果您不需要保留a,只需直接对该数组进行操作并保存副本即可。

    效率更高:因为两个列表都是排序的,我们可以使用heapq.merge

    from numpy import zeros
    import heapq
    
    c = zeros(len(a) + len(b), a.dtype)
    for i, v in enumerate(heapq.merge(a, b)):
        c[i] = v
    

    【讨论】:

    • 只是我刚刚想出的代码。但我意识到,如果数组“b”有数百万个元素长,那么机器将在您的代码中重新分配数组“c”数百万次。这对我来说看起来不是很有效
    • @Jun:是的,我不太喜欢插入,所以我找到了一个更好的方法,仍然使用预分配的数组和 heapq.merge。
    【解决方案6】:

    除了使用bisect.insort的其他答案,如果您对性能不满意,您可以尝试使用blist模块和bisect。它应该会提高性能。

    传统的listinsertion complexity is O(n),而blist's complexity on insertion is O(log(n))

    此外,您的数组似乎已排序。如果是这样,您可以使用heapq mudule 中的merge 函数来利用两个数组都已预先排序的事实。由于在内存中创建了一个新数组,这种方法会产生开销。这可能是一个可以考虑的选项,因为此解决方案的时间复杂度为 O(n+m),而带有 insort 的解决方案为 O(n*m) 复杂度(n 个元素 * m 个插入)

    import heapq
    
    a = [1,2,4,5,6,8,9]
    b = [3,4,7,10]
    
    
    it = heapq.merge(a,b) #iterator consisting of merged elements of a and b
    L = list(it) #list made of it
    print(L)
    

    输出:

    [1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]
    

    如果要删除重复值,可以使用groupby

    import heapq
    import itertools
    
    a = [1,2,4,5,6,8,9]
    b = [3,4,7,10]
    
    
    it = heapq.merge(a,b) #iterator consisting of merged elements of a and b
    it = (k for k,v in itertools.groupby(it))
    L = list(it) #list made of it
    print(L)
    

    输出:

    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    

    【讨论】:

    • 看起来不错,但我担心它是否会像 numpy 数组一样高效内存
    • @Jun 你应该做一些测试来确定内存消耗。传统数组和列表的问题是插入它们需要将插入后的所有元素移动 1。这需要O(n)。在blist 中,他们使用另一种数据结构,因此复杂度为O(log(n))。对于大型数组和列表,它将发生巨大变化。
    【解决方案7】:

    为此使用bisect module

    import bisect
    
    a = array([1,2,4,5,6,8,9])
    b = array([3,4,7,10])
    
    for i in b:
        pos = bisect.bisect(a, i)
        insert(a,[pos],i) 
    

    我现在无法测试这个,但它 should 工作

    【讨论】:

    • numpy 数组没有insert 方法。
    • 如果它确实有一个insert方法,你应该改用bisect.insort
    • 由于第二个数组也已排序,因此每次跟踪插入位置可能会更好?
    • @Jun 是的,当然,你可以这样做
    【解决方案8】:

    似乎没有人提到union1d (union1d)。目前,它是unique(concatenate((ar1, ar2))) 的快捷方式,但它的名称很短,值得记住,并且由于它是一个库函数,因此它有可能被 numpy 开发人员优化。它的性能与 seberg 接受的大型数组答案中的insort 非常相似。这是我的基准:

    import numpy as np
    
    def insort(a, b, kind='mergesort'):
        # took mergesort as it seemed a tiny bit faster for my sorted large array try.
        c = np.concatenate((a, b))  # we still need to do this unfortunatly.
        c.sort(kind=kind)
        flag = np.ones(len(c), dtype=bool)
        np.not_equal(c[1:], c[:-1], out=flag[1:])
        return c[flag]
    
    size = int(1e7)
    a = np.random.randint(np.iinfo(np.int).min, np.iinfo(np.int).max, size)
    b = np.random.randint(np.iinfo(np.int).min, np.iinfo(np.int).max, size)
    
    np.testing.assert_array_equal(insort(a, b), np.union1d(a, b))
    
    import timeit
    repetitions = 20
    print("insort: %.5fs" % (timeit.timeit("insort(a, b)", "from __main__ import a, b, insort", number=repetitions)/repetitions,))
    print("union1d: %.5fs" % (timeit.timeit("np.union1d(a, b)", "from __main__ import a, b; import numpy as np", number=repetitions)/repetitions,))
    

    我的机器上的输出:

    insort: 1.69962s
    union1d: 1.66338s
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-04-05
      • 2021-10-11
      • 1970-01-01
      • 1970-01-01
      • 2015-12-15
      • 2019-09-09
      • 2021-03-04
      相关资源
      最近更新 更多