【问题标题】:Fastest way to obtain the largest X numbers from a very large unsorted list?从非常大的未排序列表中获取最大 X 数的最快方法?
【发布时间】:2010-12-08 20:21:38
【问题描述】:

我正在尝试从我的程序生成的分数列表中获得最高的分数,即 100 分。不幸的是,列表很大(大约数百万到数十亿),因此排序是程序中耗时的部分。

进行排序以获得前 100 名的最佳方法是什么?

到目前为止,我能想到的唯一两种方法是,首先将所有分数生成一个庞大的数组,然后对其进行排序并获取前 100 名。或者第二种,生成 X 个分数,对其进行排序并截断前 100 名score 然后继续生成更多分数,将它们添加到截断列表中,然后再次对其进行排序。

无论我怎么做,它仍然需要比我想要的更多的时间,关于如何以更有效的方式做到这一点的任何想法? (我以前从未上过编程课程,也许你们这些有计算机科学学位的人知道有效的算法可以做到这一点,至少这是我所希望的)。

最后,c++中标准sort()函数使用的排序算法是什么?

谢谢,

-伪造

编辑:仅供任何好奇的人使用...

我在之前和之后做了一些时间试验,结果如下:

旧程序(每次外循环迭代后执行排序):

top 100 scores: 147 seconds
top  10 scores: 147 seconds
top   1 scores: 146 seconds
Sorting disabled: 55 seconds

新程序(实现只跟踪最高分并使用默认排序功能):

top 100 scores: 350 seconds <-- hmm...worse than before
top  10 scores: 103 seconds 
top   1 scores:  69 seconds 
Sorting disabled: 51 seconds

新的重写(存储数据的优化,手写排序算法):

top 100 scores: 71 seconds <-- Very nice!
top  10 scores: 52 seconds
top   1 scores: 51 seconds
Sorting disabled: 50 seconds

在核心 2、1.6 GHz 上完成...我等不及我的核心 i7 860 到货...

我还有很多其他更激进的优化需要解决(主要是在减少我运行的迭代次数方面),但就目前而言,速度已经足够好,我可能不会甚至费心去解决那些算法优化。

感谢大家的意见!

【问题讨论】:

  • 只是好奇,您生产的数字范围是多少?似乎从十亿个数字列表中取出前 100 个将在顶部有很多重复值,除非您的分数本身是非常大的数字。
  • 我不知道有一个标准的 sort()。你在用什么图书馆?这可能是一种快速排序。
  • 我的数字范围是可变的,我有一些加权分数可以调整以改变范围。目前,它在 3000 到大约 40000 之间。数字类型是 Int,所以我可以使用全部范围。我使用的标准库是 .
  • 所以你有一组可能的 37,000 个分数。如果你有 10 亿个分数,假设任何类型的正态分布,你的整个前 100 名将是相同的分数。 37,000 适合 10 亿次超过 27,000 次
  • 是的,但问题是分数不是均匀分布的,而是正态分布的(钟形曲线)。我正在寻找最高分,因此重复的并不多。

标签: c++ optimization visual-c++ sorting


【解决方案1】:

你想要绝对最大的 X 数字,所以我猜你不想要某种启发式方法。列表的未排序程度如何?如果它非常随机,那么您最好的选择实际上就是对整个列表进行快速排序并获取前 X 个结果。

如果您可以在列表生成期间过滤分数,那就更好了。只存储 X 值,每次获得新值时,将其与那些 X 值进行比较。如果少于所有这些,则将其丢弃。如果它大于其中之一,则丢弃新的最小值。

如果 X 足够小,您甚至可以保持 X 值列表排序,以便将新数字与排序后的值列表进行比较,您可以进行 O(1) 检查以查看新值是否更小比所有其他人都多,因此将其扔掉。否则,快速二分查找可以找到新值在列表中的位置,然后您可以丢弃数组的第一个值(假设第一个元素是最小元素)。

【讨论】:

  • 鉴于您需要查看列表中的每个元素遇到更大的数字时换出?
  • 是的,这将要求 100 的列表也保持排序。
【解决方案2】:
  1. 获取前 100 个分数,并将它们排序到一个数组中。
  2. 获取下一个分数,并将其插入排序到数组中(从“小”端开始)
  3. 删除第 101 个值
  4. 在 2 处继续下一个值,直到完成

随着时间的推移,列表会越来越像 100 个最大值,所以更多时候,您会发现插入排序立即中止,发现新值小于前 100 个候选者的最小值。

【讨论】:

  • +1 表示不需要跟踪前 100 个元素以外的任何内容。希望我也可以为建议插入排序加分。
  • 很好,我喜欢它的美丽,简单而高效!
  • 退化的情况是你的原始列表是反向排序的。这将比平均情况长 100 倍,但仍然是 O(n)。
  • 实际上,您可以在 O(logn) 时间内找到 j 处的插入点,并将 j+1 处的元素向下移动到 count-1 个位置,从而在 count-1 处删除最小值。只有当新元素大于 count-1 处的元素时,您才能执行此操作。但是,如果您要这样做,您不妨按照 Jack Lloyd 的建议使用堆。
  • 在这里使用一个固定的链表桶池可能有助于延长时间(不是算法性能);但这是微不足道的,需要进行基准测试。很好的答案和cmets。
【解决方案3】:

将数据放入一个平衡的树结构(可能是红黑树)中,该结构进行就地排序。插入应该是 O(lg n)。获取最高的 x 分数也应该是 O(lg n)。

如果您发现在某些时候需要优化,您可以每隔一段时间修剪一次树。

【讨论】:

  • 我确实提到我没有上过编程课程,对不起,你过头了....
  • 如果您有某种库可以对数组或列表进行排序,那么该库也可能有类似 TreeMap 之类的东西可以解决问题。
【解决方案4】:

如果您只需要报告前 100 个分数的值(而不是任何相关数据),并且如果您知道分数都将在有限范围内,例如 [0,100],那么一个简单的方法可以做到这一点与“计数排序”...

基本上,创建一个表示所有可能值的数组(例如,如果分数可以在 0 到 100 的范围内,则创建一个大小为 101 的数组),并将数组的所有元素初始化为 0。然后,遍历列表分数,增加已达到分数列表中的相应条目。也就是说,编译该范围内每个分数已达到的次数。然后,从数组的末尾到数组的开头,您可以挑选出最高的 X 分数。这是一些伪代码:

让 type Score 为 0 到 100 之间的整数,包括 0 到 100。 让 score 是一个 Score 对象的数组 设 scorerange 为大小为 101 的整数数组。 对于 [0,100] 中的我 设置得分范围 [i] = 0 对于分数中的每个分数 设置 scorerange[score] = scorerange[score] + 1 让 top 是要报告的最高分数的数量 让 idx 是一个初始化为 scorerange 末尾的整数(即 100) 而(顶部> 0)和(idx> = 0): 如果 scorerange[idx] > 0: 报告“有” scorerange[idx] “得分值为” idx top = top - scorerange[idx] idx = idx - 1;

【讨论】:

    【解决方案5】:

    声明一个数组,您可以在其中放置 100 个最佳分数。遍历巨大的列表并检查每个项目是否有资格插入前 100 名。使用简单的插入排序将项目添加到顶部列表。

    类似这样的东西(C# 代码,但你明白了):

    Score[] toplist = new Score[100];
    int size = 0;
    foreach (Score score in hugeList) {
       int pos = size;
       while (pos > 0 && toplist[pos - 1] < score) {
          pos--;
          if (pos < 99) toplist[pos + 1] = toplist[pos];
       }
       if (size < 100) size++;
       if (pos < size) toplist[pos] = score;
    }
    

    我在我的电脑上测试了它(Code 2 Duo 2.54 MHz Win 7 x64),我可以在 369 毫秒内处理 100.000.000 个项目。

    【讨论】:

    • 嗯,所以在我进行插入排序之前首先生成整个分数数组......我想在实现它之前必须看看哪种方法会产生最多的缓存命中。谢谢。
    • @Faken:我不知道它是否与缓存命中有关,但显然这段代码比 Jack Lloyd 使用堆的代码快 700 倍......
    【解决方案6】:

    您可以使用堆在 O(n) 时间内完成此操作,无需任何排序:

    #!/usr/bin/python
    
    import heapq
    
    def top_n(l, n):
        top_n = []
    
        smallest = None
    
        for elem in l:
            if len(top_n) < n:
                top_n.append(elem)
                if len(top_n) == n:
                    heapq.heapify(top_n)
                    smallest = heapq.nsmallest(1, top_n)[0]
            else:
                if elem > smallest:
                    heapq.heapreplace(top_n, elem)
                    smallest = heapq.nsmallest(1, top_n)[0]
    
        return sorted(top_n)
    
    
    def random_ints(n):
        import random
        for i in range(0, n):
            yield random.randint(0, 10000)
    
    print top_n(random_ints(1000000), 100)
    

    在我的机器上运行的时间(Core2 Q6600、Linux、Python 2.6,使用 bash time builtin 测量):

    • 100000 个元素:0.29 秒
    • 1000000 个元素:2.8 秒
    • 10000000 个元素:25.2 秒

    编辑/添加:在 C++ 中,您可以使用 std::priority_queue,就像这里使用 Python 的 heapq 模块一样。您需要使用std::greater 排序而不是默认的std::less,以便top() 成员函数返回最小元素而不是最大元素。 C++ 的优先级队列没有 heapreplace 的等价物,它将顶部元素替换为新元素,因此您需要 pop 顶部(最小)元素,然后 push 新看到的值。除此之外,该算法非常干净地从 Python 转换为 C++。

    【讨论】:

    • @strager 对于任何常数 X,比如 100,堆操作可以被视为常数时间,因为它们是 log(X) 或 X*log(X);在 X 为常数的情况下,这些被渐近地视为 O(1)。而且这不是一种排序方法,真的,除非你设置 X = N,在这种情况下,当然,X 不是一个常数。
    • @Lloyd,是的,我意识到了这一点。 =X
    【解决方案7】:

    你可以在 Haskell 中这样做:

    largest100 xs = take 100 $ sortBy (flip compare) xs
    

    这看起来像是将所有数字按降序排序(“翻转比较”位将参数反转到标准比较函数),然后返回列表中的前 100 个条目。但是 Haskell 是惰性求值的,所以 sortBy 函数只进行了足够的排序以找到列表中的前 100 个数字,然后停止。

    纯粹主义者会注意到,您也可以将函数编写为

    largest100 = take 100 . sortBy (flip compare)
    

    这意味着同样的事情,但说明了 Haskell 风格,即从其他函数的构建块中组合一个新函数,而不是在周围处理变量。

    【讨论】:

      【解决方案8】:

      我在 2008 年回答面试问题时回答了这个问题。我实现了 templatized priority queue in C#

      using System;
      using System.Collections.Generic;
      using System.Text;
      
      namespace CompanyTest
      {
          //  Based on pre-generics C# implementation at
          //      http://www.boyet.com/Articles/WritingapriorityqueueinC.html
          //  and wikipedia article
          //      http://en.wikipedia.org/wiki/Binary_heap
          class PriorityQueue<T>
          {
              struct Pair
              {
                  T val;
                  int priority;
                  public Pair(T v, int p)
                  {
                      this.val = v;
                      this.priority = p;
                  }
                  public T Val { get { return this.val; } }
                  public int Priority { get { return this.priority; } }
              }
              #region Private members
              private System.Collections.Generic.List<Pair> array = new System.Collections.Generic.List<Pair>();
              #endregion
              #region Constructor
              public PriorityQueue()
              {
              }
              #endregion
              #region Public methods
              public void Enqueue(T val, int priority)
              {
                  Pair p = new Pair(val, priority);
                  array.Add(p);
                  bubbleUp(array.Count - 1);
              }
              public T Dequeue()
              {
                  if (array.Count <= 0)
                      throw new System.InvalidOperationException("Queue is empty");
                  else
                  {
                      Pair result = array[0];
                      array[0] = array[array.Count - 1];
                      array.RemoveAt(array.Count - 1);
                      if (array.Count > 0)
                          trickleDown(0);
                      return result.Val;
                  }
              }
              #endregion
              #region Private methods
              private static int ParentOf(int index)
              {
                  return (index - 1) / 2;
              }
              private static int LeftChildOf(int index)
              {
                  return (index * 2) + 1;
              }
              private static bool ParentIsLowerPriority(Pair parent, Pair item)
              {
                  return (parent.Priority < item.Priority);
              }
              //  Move high priority items from bottom up the heap
              private void bubbleUp(int index)
              {
                  Pair item = array[index];
                  int parent = ParentOf(index);
                  while ((index > 0) && ParentIsLowerPriority(array[parent], item))
                  {
                      //  Parent is lower priority -- move it down
                      array[index] = array[parent];
                      index = parent;
                      parent = ParentOf(index);
                  }
                  //  Write the item once in its correct place
                  array[index] = item;
              }
              //  Push low priority items from the top of the down
              private void trickleDown(int index)
              {
                  Pair item = array[index];
                  int child = LeftChildOf(index);
                  while (child < array.Count)
                  {
                      bool rightChildExists = ((child + 1) < array.Count);
                      if (rightChildExists)
                      {
                          bool rightChildIsHigherPriority = (array[child].Priority < array[child + 1].Priority);
                          if (rightChildIsHigherPriority)
                              child++;
                      }
                      //  array[child] points at higher priority sibling -- move it up
                      array[index] = array[child];
                      index = child;
                      child = LeftChildOf(index);
                  }
                  //  Put the former root in its correct place
                  array[index] = item;
                  bubbleUp(index);
              }
              #endregion
          }
      }
      

      【讨论】:

        【解决方案9】:

        这是执行此操作的“自然”C++ 方式:

        std::vector<Score> v;
        // fill in v
        std::partial_sort(v.begin(), v.begin() + 100, v.end(), std::greater<Score>());
        std::sort(v.begin(), v.begin() + 100);
        

        这与分数的数量成线性关系。

        标准没有指定 std::sort 使用的算法,但 libstdc++(由 g++ 使用)使用“自适应引入排序”,本质上是将 3 中值快速排序降低到某个级别,然后通过插入排序。

        【讨论】:

        • 对,就是想这样回答!
        【解决方案10】:

        由于速度在这里至关重要,并且 40.000 个可能的高分值完全可以由当今的任何计算机维护,为了简单起见,我将求助于桶排序。我的猜测是它会优于迄今为止提出的任何算法。缺点是您必须确定高分值的一些上限。

        所以,假设您的最高分值为 40.000:

        创建一个包含 40.000 个条目的数组。循环浏览您的高分值。每次遇到高分 x 时,将 array[x] 加一。在此之后,您所要做的就是计算数组中的最高条目,直到达到 100 个计数的高分。

        【讨论】:

        • 好吧,桶排序可以找到我的前 100 分,但它只会给我最高分。我想这是我的错,我没有完全按照我应该定义的那样定义问题。每个分数都来自 3 个值,这些分数中的每一个都必须将这 3 个值与分数一起标记,因此桶排序不适合我的需要。但是你说得对,如果范围很小并且我没有对类进行排序,那么这种方法将明显优于其他方法。
        • 嗯...再想一想,如果我实现了附加到每个桶以存储其他数据的某种列表,它可能会起作用...但除非我放,否则这将非常占用内存某处的截止点,但即使那样,如果不迭代整个数据集,我也无法猜测出高范围。
        • 但话又说回来,我总是可以在每次说之后实现一个截止,外循环迭代检查我的前 100 个分数在哪里,以及一个 if 语句来检查下一个分数是否在那个高分值内。 .这实际上可能会更有效率!唯一的缺点是内存使用情况,目前的最佳答案只使用最多 400Kb 的内存总量......但话又说回来,8 GB 的内存,几百 MB 是什么? (错误,我猜缓存与此有很大关系……虽然早期的程序完全可以很好地放置在 L2 缓存中)。不管怎样,这很有趣……
        • 并非如此。请记住,在您的数组中,您只会存储指向您选择用于封装数据的任何结构的指针。在这个新场景中,每个潜在的 40.000 个插槽中都有一个指针数组。因此,您将有空间容纳 40.000 个 32 位指针,而不是 40.000 个 32 位整数。至于数据本身,无论如何都必须存储它,因此不会在那里花费多余的内存。您还可以实现一个安全阀功能,如果有高于最高估计值的值,它将调整您的 40.000 数组的大小,即 10.000。
        • +1 来自我。 Bucketsorts(和它的表兄弟)没有得到充分利用。虽然在实践中如果 std::partial_sort “足够快”,我会选择它,因为它更简单。
        【解决方案11】:
        猜你喜欢
        • 2016-02-10
        • 1970-01-01
        • 2014-01-22
        • 1970-01-01
        • 2018-11-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多