【问题标题】:Any better/faster way to calculate the relative rank of each element in an array?有没有更好/更快的方法来计算数组中每个元素的相对排名?
【发布时间】:2020-08-12 16:10:38
【问题描述】:

我想计算数组中每个元素在它之前的元素之间的相对排名。例如,在数组[2,1,4,3] 中,第二个元素(1)在[2,1] 的子集数组中的相对排名(从小到大)为1。第三个元素(4)在[2,1,4] 的子集数组中的相对排名是3。每个元素的最终相对排名应该是[1,1,3,3]

我正在使用以下 python 代码:

x = np.array([2,1,4,3])
rr = np.ones(4)
for i in range(1,4):
    rr[i] = sum(x[i] >= x[:i+1])

还有其他更快的方法吗?

【问题讨论】:

  • 我可以为排名库担保。 pypi.org/project/ranking
  • @AKX:这完全不同。
  • 啊,是的。阅读问题有点太快了......
  • 对此的经典算法是反转跟踪合并排序。我太困了,现在无法给出很好的解释,但谷歌搜索应该会找到有用的资源。

标签: python performance numpy sorting rank


【解决方案1】:

不确定它是否更快,但您可以通过列表理解来做到这一点,这总是让我开心:

[sorted(x[:i+1]).index(v)+1 for i, v in enumerate(x)]

【讨论】:

    【解决方案2】:

    这是broadcasting的矢量化方式-

    n = len(x)
    m1 = x[1:,None]>=x
    m2 = np.tri(n-1,n,k=1, dtype=bool)
    rr[1:] = (m1 & m2).sum(1)
    

    或者,我们可以引入einsumnp.matmul 来完成sum-reduction 的最后一步-

    (m1.astype(np.float32)[:,None,:] @ m2[:,:,None])[:,0,0]
    np.einsum('ij,ij->i',m1.astype(np.float32),m2)
    

    【讨论】:

      【解决方案3】:

      您当前的算法需要二次时间,这不会扩展到大型输入。你可以做得更好。

      一种更好的方法是使用排序数据结构,如sortedcontainers.SortedList,并执行一系列查找和插入。以下示例实现返回一个列表,假设没有平局,并从 0 开始排名:

      import sortedcontainers
      
      def rank(nums):
          sortednums = sortedcontainers.SortedList()
          ranks = []
          for num in nums:
              ranks.append(sortednums.bisect_left(num))
              sortednums.add(num)
          return ranks
      

      大部分工作都在 SortedList 实现中,SortedList 非常快,所以这不应该有太多的 Python 开销。 sortedcontainers 的存在肯定比下一个选项更方便,即使不一定更有效。

      这个选项运行在... O(n log n)-ish 时间。 SortedList 使用两层层次结构而不是传统的树结构,故意权衡更多数据移动以减少指针追逐,因此插入理论上不是 O(log n),但在实践中是有效的。


      下一个选项是使用增强的归并排序。如果你这样做,你会想要使用 Numba 或 Cython,因为你必须手动编写循环。

      基本思想是进行归并排序,但在进行时跟踪其子数组中每个元素的排名。当您合并两个已排序的子数组时,左侧的每个元素都保持其旧排名,而右侧元素的排名值会根据左侧的元素数量向上调整。

      此选项运行时间为 O(n log n)。

      在 Python 列表上运行的未优化实现,假设没有关联,并且从 0 开始排名,如下所示:

      def rank(nums):
          _, indexes, ranks = _augmented_mergesort(nums)
          result = [None]*len(nums)
          for i, rank_ in zip(indexes, ranks):
              result[i] = rank_
          return result
      
      def _augmented_mergesort(nums):
          # returns sorted nums, indexes of sorted nums in original nums, and corresponding ranks
          if len(nums) == 1:
              return nums, [0], [0]
          left, right = nums[:len(nums)//2], nums[len(nums)//2:]
          return _merge(*_augmented_mergesort(left), *_augmented_mergesort(right))
      
      def _merge(lnums, lindexes, lranks, rnums, rindexes, rranks):
          nums, indexes, ranks = [], [], []
          i_left = i_right = 0
      
          def add_from_left():
              nonlocal i_left
              nums.append(lnums[i_left])
              indexes.append(lindexes[i_left])
              ranks.append(lranks[i_left])
              i_left += 1
          def add_from_right():
              nonlocal i_right
              nums.append(rnums[i_right])
              indexes.append(rindexes[i_right] + len(lnums))
              ranks.append(rranks[i_right] + i_left)
              i_right += 1
      
          while i_left < len(lnums) and i_right < len(rnums):
              if lnums[i_left] < rnums[i_right]:
                  add_from_left()
              elif lnums[i_left] > rnums[i_right]:
                  add_from_right()
              else:
                  raise ValueError("Tie detected")
      
          if i_left < len(lnums):
              nums += lnums[i_left:]
              indexes += lindexes[i_left:]
              ranks += lranks[i_left:]
          else:
              while i_right < len(rnums):
                  add_from_right()
      
          return nums, indexes, ranks
      

      对于优化的实现,您需要一个插入排序基本情况,您需要使用 Numba 或 Cython,您需要对数组进行操作,并且您不想做太多的分配。

      【讨论】:

      • 感谢您的详细解释。我会尝试理解你建议的每一个算法,第一个是迄今为止所有回复中最好的!
      • 我试过你的第二个算法,它不如你的第一个。我也尝试将您的算法应用于二维数组,但与 numy 数组计算相比,它们的性能并不令人满意。有关 2D 数组测试,请参阅我的新帖子。
      • @John:第二个实现未优化。你必须用 Cython 或 Numba 重写它才能看到更好的性能。至于 2D 案例,在 Divarkar 的版本中,您不应该同时使用 (m1.astype(np.float32)[:,None,:] @ m2[:,:,None])[:,0,0] np.einsum('ij,ij-&gt;i',m1.astype(np.float32),m2)
      • 感谢您的回复。我将尝试学习如何为 numba 编写代码。我是 Python 新手。
      【解决方案4】:

      你们都是我的英雄!做得很好!我想向您展示每种解决方案的比较:

      import numpy as np
      import time
      import sortedcontainers
      
      def John(x):
          n=len(x)
          rr=np.ones(n)
          for i in range(1,n):
              rr[i]=sum(x[i]>=x[:i+1])
          return rr
      
      def Matvei(x):
          return [sorted(x[:i+1]).index(v)+1 for i, v in enumerate(x)]
      
      def Divarkar1(x):
          n = len(x)
          m1 = x[1:,None]>=x
          m2 = np.tri(n-1,n,k=1, dtype=bool)
          rr[1:] = (m1 & m2).sum(1)
          return rr
      
      
      def Divarkar2(x):
          n = len(x)
          m1 = x[1:,None]>=x
          m2 = np.tri(n-1,n,k=1, dtype=bool)
          (m1.astype(np.float32)[:,None,:] @ m2[:,:,None])[:,0,0]
          rr[1:]=np.einsum('ij,ij->i',m1.astype(np.float32),m2)
          return rr
      
      def Monica(x):
          sortednums = sortedcontainers.SortedList()
          ranks = []
          for num in x:
              ranks.append(sortednums.bisect_left(num))
              sortednums.add(num)
          return np.array(ranks)+1
      
      
      x=np.random.rand(4000)
      
      t1=time.time()
      rr=John(x)
      t2=time.time()
      print(t2-t1)
      #print(rr)
      
      t1=time.time()
      rr=Matvei(x)
      t2=time.time()
      print(t2-t1)
      #print(rr)
      
      t1=time.time()
      rr=Divarkar1(x)
      t2=time.time()
      print(t2-t1)
      #print(rr)
      
      t1=time.time()
      rr=Divarkar2(x)
      t2=time.time()
      print(t2-t1)
      #print(rr)
      
      t1=time.time()
      rr=Monica(x)
      t2=time.time()
      print(t2-t1)
      #print(rr)
      

      结果是:

      19.5

      2.9

      0.079

      0.25

      0.017

      我跑了几次,结果都差不多。最好的是莫妮卡的算法!

      非常感谢大家!

      约翰

      【讨论】:

      • 如果您有时间添加到这篇文章中,我想看看我的解决方案如何使用您的基准进行比较。
      【解决方案5】:

      当我将所有算法转换为 numpy 二维数组时,我发现我的算法是最好的。当然性能也取决于二维数组的维度。但 380x900 是我的情况。我认为 Numpy 数组计算受益匪浅。以下是代码:

      import numpy as np
      import time
      import sortedcontainers
      def John(x): #x is 1D array
          n=len(x)
          rr=[]
          for i in range(n):
              rr.append(np.sum(x[i]>=x[:i+1]))
          return np.array(rr)
      
      def John_2D(rv): #rv is 2d numpy array. rank it along axis 1!
          nr,nc=rv.shape
          rr=[]
          for i in range(nc):
              rr.append(np.sum((rv[:,:i+1]<=rv[:,i:i+1]),axis=1))
          return np.array(rr).T
      
      def Matvei(x): #x is 1D array
          return [sorted(x[:i+1]).index(v)+1 for i, v in enumerate(x)]
      
      def Divarkar1(x):#x is 1D array
          n = len(x)
          rr=np.ones(n,dtype=int)
          m1 = x[1:,None]>=x
          m2 = np.tri(n-1,n,k=1, dtype=bool)
          rr[1:] = (m1 & m2).sum(1)
          return rr
      
      def Divarkar2(x):#x is 1D array
          n = len(x)
          rr=np.ones(n,dtype=int)
          m1 = x[1:,None]>=x
          m2 = np.tri(n-1,n,k=1, dtype=bool)
          (m1.astype(np.float32)[:,None,:] @ m2[:,:,None])[:,0,0]
          rr[1:]=np.einsum('ij,ij->i',m1.astype(np.float32),m2)
          return rr
      
      def Monica1(nums): #nums is 1D array
          sortednums = sortedcontainers.SortedList()
          ranks = []
          for num in nums:
              ranks.append(sortednums.bisect_left(num))
              sortednums.add(num)
          return np.array(ranks)+1
      
      def Monica2(nums): #nums is 1D array
          _, indexes, ranks = _augmented_mergesort(nums)
          result = [None]*len(nums)
          for i, rank_ in zip(indexes, ranks):
              result[i] = rank_
          return np.array(result)+1
      
      def _augmented_mergesort(nums): #nums is 1D array
          # returns sorted nums, indexes of sorted nums in original nums, and corresponding ranks
          if len(nums) == 1:
              return nums, [0], [0]
          left, right = nums[:len(nums)//2], nums[len(nums)//2:] #split the array by half
          return _merge(*_augmented_mergesort(left), *_augmented_mergesort(right))
      
      def _merge(lnums, lindexes, lranks, rnums, rindexes, rranks):
          nums, indexes, ranks = [], [], []
          i_left = i_right = 0
      
          def add_from_left():
              nonlocal i_left
              nums.append(lnums[i_left])
              indexes.append(lindexes[i_left])
              ranks.append(lranks[i_left])
              i_left += 1
          def add_from_right():
              nonlocal i_right
              nums.append(rnums[i_right])
              indexes.append(rindexes[i_right] + len(lnums))
              ranks.append(rranks[i_right] + i_left)
              i_right += 1
      
          while i_left < len(lnums) and i_right < len(rnums):
              if lnums[i_left] < rnums[i_right]:
                  add_from_left()
              elif lnums[i_left] > rnums[i_right]:
                  add_from_right()
              else:
                  raise ValueError("Tie detected")
      
          if i_left < len(lnums):
              while i_left < len(lnums):
                  add_from_left()
              #nums += lnums[i_left:]
              #indexes += lindexes[i_left:]
              #ranks += lranks[i_left:]
          else:
              while i_right < len(rnums):
                  add_from_right()
          return nums, indexes, ranks
      
      def rank_2D(f,nums): #f is method, nums is 2D numpy array
          result=[]
          for x in nums:
              result.append(f(x))
          return np.array(result)
      
      x=np.random.rand(6000)
      for f in [John, Matvei, Divarkar1, Divarkar2, Monica1, Monica2]:
          t1=time.time()
          rr=f(x)
          t2=time.time()
          print(f'{f.__name__+"_1D: ":16}  {(t2-t1):.3f}')
      print()
      
      x=np.random.rand(380,900)
      t1=time.time()
      rr=John_2D(x)
      t2=time.time()
      print(f'{"John_2D:":16}  {(t2-t1):.3f}')
      #print(rr)
      
      for f in [Matvei, Divarkar1, Divarkar2, Monica1, Monica2]:
          t1=time.time()
          rr=rank_2D(f,x)
          t2=time.time()
          print(f'{f.__name__+"_2D: ":16}  {(t2-t1):.3f}')
          #print(rr)
      

      典型的结果是:

      John_1D:          0.069
      Matvei_1D:        7.208
      Divarkar1_1D:     0.163
      Divarkar2_1D:     0.488
      Monica1_1D:       0.032
      Monica2_1D:       0.082
      
      John_2D:          0.409
      Matvei_2D:        49.044
      Divarkar1_2D:     1.276
      Divarkar2_2D:     4.065
      Monica1_2D:       1.090
      Monica2_2D:       3.571
      

      对于一维数组,Monica1 方法是最好的,但我的 numpy-version 方法也不错。 对于二维数组,我的 numpy-version 方法是最好的。

      欢迎测试和评论。

      谢谢

      约翰

      【讨论】:

      • 这很奇怪。您之前说过您的代码在 4000 个数字的数组上需要 19.5 秒,但现在在 6000 个数字的数组上只需要 0.069 秒?将rr 从列表更改为数组并不能说明这一点。出了点问题。
      • 我看到你也切换到np.sum。可能就是这样。
      • 是的,我改成numpy计算了,自然是针对二维数组的,性能大幅度提升!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-19
      • 2020-10-17
      • 1970-01-01
      • 1970-01-01
      • 2017-11-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多