【问题标题】:Fast calculation of min, max, and average of incoming numbers快速计算输入数字的最小值、最大值和平均值
【发布时间】:2012-04-23 20:49:07
【问题描述】:

程序每秒接收大约 50,000 个号码。

在任何给定时刻,我都需要计算在最后一秒(关于给定时刻)到达的值(数字)的最小值、最大值和平均值。

有没有办法不使用数组或列表(缓冲区)来存储到达的数字并计算结果?

如果我需要使用缓冲区,实现此目的的有效方法是什么?

(请注意,缓冲区中的数字也必须不时有效地删除)

【问题讨论】:

  • 你能保证号码按顺序到达吗?
  • 您说“大约 50,000”会有所不同还是您不确定 #?
  • 它可以变化,数据来自外部组件...
  • @ademing,数字是按顺序到达的,但它们不是有序的(如 n1
  • 您需要一个运行平均值,还是提供最后一个完整 1 秒间隔的平均值就足够了?后者要简单得多。

标签: c# performance algorithm


【解决方案1】:

这是一个在某些情况下可以提高效率的算法:

  1. 当事件进入时,将它们完全缓冲,并计算一个正在运行的sumcountminmax(微不足道)。

  2. 当请求averageminmax 时,从缓冲区的后面循环并开始删除超过一秒的值。边走边从sumcount 中减去。

    • 如果值都高于min,您可以保留min。如果值低于max,您可以保留您的max。在这种情况下,您可以高效地更新averageminmax

    • 如果值低于min 或高于max,则需要遍历数组的其余部分并重新计算。

  3. 每秒大约执行一次第二步,这样缓冲区就不会太满。这段代码也可以在每个缓冲区插入上执行,或者在任何有意义的地方执行。

这种工作的最佳结构是循环缓冲区,以避免内存分配和 GC 妨碍。它应该足够大以涵盖每秒消息大小的最坏情况。

更新

根据使用场景,另一件事是运行上面的算法,但以 10 x 100ms 块而不是 1 x 1000ms 块的形式运行。也就是说,在这 10 个块上保持运行的 min、max、sum 和 count。那么当你到达“失效”场景时,你通常只需要查看最近 100ms 的数据或快速浏览其他 9 个块的最小值和最大值。


@ja72 提供了一个好主意,可以在最小值和最大值无效时节省查找它们的时间:

x_max 不是保留最小值/最大值 x_min,而是保留它们在 x[i] 数组中的位置索引,其中包含 i_min 和 i_max。然后找到它们有时可能是微不足道的,但是当考虑的最后一个值包含最小值和最大值时,需要扫描整个列表以建立新的限制。


Sam Holder 在 cmets 中有另一个好主意 - 保持一个始终排序的并行数组,这让您可以将数字从顶部或底部去掉,以便更轻松地找到新的最小值和最大值。但是,这里的插入速度会受到一点影响(需要保持有序)。


最终,正确的选择将取决于程序的使用特性。值的读取频率与插入频率?

【讨论】:

  • 通过在我的建议中使用联锁队列,可以永远不必寻找最小值和最大值(队列维护它们,除非添加或删除元素,否则无需进行任何工作) ,但您可能是对的,在大多数情况下,分配节点和维护排序列表的成本比必须再次查看列表以获取新的最大/最小值更多。
  • 这取决于查询的频率。如果不经常制作,则插入需要便宜,查找可能很昂贵。反之亦然。
  • 如果我使用循环缓冲区(一次分配的数组结合上下限),这是否意味着插入和删除(收缩)非常便宜?
  • @Dusan:插入将是 O(1)。如果您传递最大值或最小值,则删除可能是 O(n)。查找也可能很昂贵。
  • 如果我是正确的,找到超过一秒的最新元素的索引可以使用二分搜索来完成吗?
【解决方案2】:

使用循环缓冲区,每个元素都有时间戳和数据,每秒最大元素数作为循环缓冲区的大小。

当每个元素都插入缓冲区头部时,检查缓冲区另一侧是否过期,删除该元素。

如果删除的元素是最小值或最大值,则必须计算新的最小值/最大值。如果不是,您将根据新来者更新 min/max。

对于平均值,保留总数,保留计数,然后除法。

【讨论】:

    【解决方案3】:

    您不能在队列中保留您的号码及其到达时间,以及队列中当前的最大值和最小值(可能需要将值的数量保持在相同的最小值/最大值)和总数队列中所有数字的值和元素个数。

    然后当一个数字到达时将其添加到队列中并调整最小/最大值/值和计数。然后查看队列的另一端,将最后一个数字到达后1秒内没有的元素全部移除,再次调整max/min/count/total值。

    那么你就不需要在瞬间计算任何东西,只需返回预先计算的东西(即读取min/max或total/count的当前值)

    正如@yaman 指出的那样,你不能只保留最小值和最大值,因为当一个被删除时你可能不知道新的。在这种情况下,我可能只保留列表中所有数字的第二份副本,而不是按到达时间排序,而是按值排序。然后,您也只需从该列表中添加和删除每个数字,因此您将始终知道最大值和最小值。这样您就不必扫描缓冲区中的所有元素以找到新的最大/最小值,但代价是保留 2 个副本,但此列表的更新应该很便宜,因为它已经订购了。

    【讨论】:

    • 计数和总值可以随时调整。不能通过删除值来调整最小值和最大值,因为当旧值无效时,需要对所有值进行全面扫描才能找到新值。我的回答涵盖了最坏的情况。
    • 保持排序列表是多余的。而是保留某种优先级队列,例如堆。那些需要对数的工作来获得最大值,插入和删除。但是,您需要一个用于最大值,另一个用于最小值,因此您需要 3 个列表。
    • @btilly 使用堆而不是排序列表的好建议。读取 max/min 是 O(1),它正在删除 O(log n) 的 max/min 元素,因为需要工作来维护堆。正如我在对列表的回答中指出的那样,您需要为每个元素使用一个节点对象(尽管使用了两个堆和一个列表),因此您可以在发现它们已过期时从堆中删除元素,否则删除将是 O(n) 因为您必须在每个堆中搜索过期元素。使用单个节点意味着当我们从列表中删除过期元素时,我们知道元素在每个堆中的位置。
    【解决方案4】:

    @DanRedux 是正确的;您每次都需要计算它们,因为您的输入正在改变。现在,您可能更喜欢按需或预先(即,当您获得新批次时)计算这些数字,具体取决于需要结果的频率。

    例如,如果您的平均用例每约 30 秒轮询一次这些统计信息,那么我可能会按需计算它们并缓存结果,直到有新批次进来。不过,这实际上取决于您的使用场景。

    至于如何存储它们,你真的别无选择,是吗?您需要内存中所有 50,000 个数字的空间。所以......你需要一块足够大的内存来容纳它们。为了避免每次有新序列进入时不断分配 2KB,您最好声明一个足够大的数组以容纳可能的最大数据集并重用它。同样,这取决于您的要求,即您是否知道您最大的可能数据集是什么?随着时间的推移,每秒钟分配一块新的内存是否会导致您的应用程序出现问题?

    【讨论】:

      【解决方案5】:

      如果最后一个 N 值的平均值 x[0] .. x[N-1]m_1x[0] 是最新值,x[N-1] 是最后考虑的值)然后平均 m_2将所有内容推回一个索引并添加值x 的值是

       m_2 = m_1+(x-x[N-1])/N;
       for(i=N-1;i>0;i--) { x[i]=x[i-1]; }
       x[0] = x;
      

      x_minx_max 不保留最小值/最大值,而是保留它们在 x[i] 数组中的位置索引,i_mini_max。然后找到它们有时可能是微不足道的,但是当考虑的最后一个值包含最小值和最大值时,需要扫描整个列表以建立新的限制。

      【讨论】:

      • 最小和最大索引的绝妙技巧,非常方便!
      【解决方案6】:

      有一种有效的方法可以跟踪给定时间窗口内的最小值(或最大值),而通常不必存储在该窗口内到达的所有数字。 (但是,最坏的情况仍然需要存储所有数字,因此您需要为所有数字保留空间,或者接受有时可能会得到不正确的结果。)

      诀窍是只存储以下值:

      1. 已在时间窗口内到达,并且
      2. 小于(或大于)任何以后的值。

      实现这一点的合适数据结构是一个简单的循环缓冲区,用于存储值及其到达时间。您需要在缓冲区中维护两个索引。下面是该算法的简单英文描述:

      启动时:

      • 分配一个 N 元素缓冲区 val 值和一个对应的 N 元素缓冲区 time 时间戳。
      • imax = 0(或介于0 和N-1 之间的任何其他值)并令inext = imax。这表示缓冲区当前为空。

      当一个新的值new收到时t

      • imaxinexttime[imax] 在区间外时,将imax 加一(模N)。
      • imaxinextval[inext-1]new时,将inext减一(模N)。
      • val[inext] = newtime[inext] = t
      • 如果inextimax-1,则将inext加一(模N);否则适当地处理“缓冲区已满”的情况(例如,分配一个更大的缓冲区,抛出一个异常,或者只是忽略它并接受最后一个值没有正确记录)。

      当请求最小值时:

      • imaxinexttime[imax] 在区间外时,将imax 加一(模N)。
      • 如果imaxinext,则返回val[imax];否则返回一个错误,表明在时间间隔内没有收到任何值。

      如果接收到的值是独立同分布的(并且作为泊松过程到达),我相信可以证明在任何给定时间存储在列表中的值的平均数是 ln(n em>+1),其中 n 是时间间隔内接收到的值的平均数。对于 n = 50,000,ln(n+1) ≈ 10.82。但是,请记住,这只是平均值,有时可能需要多几倍的空间。


      不幸的是,同样的技巧通常不起作用。如果可能的话,您可以切换到exponentially moving average,它可以使用非常小的空间轻松跟踪(只有一个数字表示平均值,一个时间戳表示上次更新的时间)。

      如果这是不可能的,但您愿意接受平均值的少量平滑,您可以计算一个平均值,例如,每毫秒。这样,每当请求最后一秒的平均值时,您可以取最后 1001 毫秒平均值的平均值,根据间隔内的毫秒数衡量最旧和最新的平均值:

      启动时:

      • interval 为平均时间间隔的长度,令 n 为子间隔数。
      • dt = interval / n
      • 分配一个 n+1 元素缓冲区 sum 的值和一个 n+1 元素缓冲区 cnt 的非负整数,并填充两者用零。
      • prev 有任何值。 (这并不重要。)

      当一个新值new接收到的时候t

      • i = floor(t / dt) mod (n+1)。
      • 如果iprev
        • total 中减去sum[i] 和从count 中减去cnt[i]
        • sum[i] = 0,cnt[i]= 0 并让prev = i
      • new 添加到sum[i] 并将cnt[i] 加一。
      • new 添加到total 并将count 加一。

      当请求平均值时 t:

      • i = floor(t / dt) mod (n+1)。
      • 如果iprev
        • total 中减去sum[i],从count 中减去cnt[i]
        • sum[i] = 0,cnt[i]= 0,让prev = i
      • j = (in) mod (n+1) = (i+1) mod (n+1)。
      • w = frac(t / dt) = (t / dt) - floor(t / dt)。
      • 返回 (total - w × sum[j]) / (count - w × cnt[j])。

      【讨论】:

        【解决方案7】:

        很遗憾,没有。不可能的原因是你只需要考虑第二个旧的,这意味着你每次都必须重新计算结果,这意味着巨大的循环。

        如果你想计算最后的 40,000 个数字,或者所有这些数字,会更容易,但因为它是基于时间的,你必须每次都遍历整个列表。

        【讨论】:

        • 您只需要保留最后一秒的数据
        • 不是真的。查看其他答案。
        • 同意,有一些聪明的方法可以解决这个问题。
        • 不可能避免循环,这就是我要说的。即使只是保持最小值也成为 O(n) 问题,而不是 O(1) 问题,因为当当前最小值丢失时,您必须扫描 50,000 个值的整个缓冲区以查找新最小值。当然,它可以提高效率,但如果你限制的是时间而不是静态数字,那么就不可能完全避免循环。
        • 您只需要扫描最后一秒的数据。每次读取的数据量相同。因此,O(1)。
        【解决方案8】:

        有没有办法在不使用数组或列表(缓冲区)的情况下做到这一点 存储到达的数字并计算结果?

        没有。正如您所说,如果不存储信息,就不可能做到这一点。不过,您可以稍微调整要求以摆脱对缓冲区的需求。

        如果我需要使用缓冲区,那么有效的方法是什么 这个?

        您需要为此使用队列。

        添加项目时,如果是新的最大值或最小值,请相应地调整这些变量。您可以通过公式here 逐步调整平均值。只需将新值减去平均值,除以集合中的新项目数(即队列大小加一),然后将其添加到旧平均值。

        那么你或多或少会是这样的:

        while(queue.Peek < oneSecondAgo)
        {
          oldItem = queue.Peek
          queue.Dequeue();
          if(oldItem == min) //recalculate min
          if(oldItem == max) //recalculate max
          mean += SubtractValueFromMean(oldItem.Value, queue.Count);
        }
        

        要从平均值中删除值,您应该能够使用相同的公式进行加法,但使用值的负数而不是正数...我认为。一个更好的数学家可能需要在这里帮助你。

        【讨论】:

          【解决方案9】:

          如果数字一个接一个地出现,则使用秒表和 while 循环在一秒钟内一个一个地获取每个数字,并计算最小值、最大值和平均值。

          double min = double.MaxValue;
          double max = double.MinValue;
          double sum = 0;
          int count = 0;
          double avg;
          StopWatch sw = new StopWatch();
          sw.Start();
          while(sw.Elapsed.TotalSeconds <= 1)
          {
             // Get the next number in the stream of numbers
             double d = GetNextNumber();
          
             // Calculate min
             if(d < min) min = d;
             // Calculate max
             if(d > max) max = d;
          
             // Calculate avg = sum/ count
             sum += d;
             count++;
          }
          
          avg = sum/count;
          

          然后返回最小值、最大值和平均值。

          【讨论】:

            【解决方案10】:

            不保留缓冲区或队列中的数字是不可能的。

            原因很简单:当最大值过期(超出 1 秒窗口)时,新的最大值是在最后一秒内到达的其他数字,因此您需要记录可能成为新的最大值。

            需要平均值意味着所有值在过期时都会产生影响,并且在它一秒之前不能丢弃任何东西。

            Sam Holder 建议使用队列是一个很好的建议,尽管您可能需要一个专门的队列来同时将您的列表保持在两个顺序中:接收号码的顺序(到达时间),以及从最大到最小。

            使用带有两个 next 和两个 previous 指针的单个节点对象(一对在时间上,另一个在大小方面)可以同时从两个列表中删除元素,当一个元素从临时列表中过期时,您可以访问大小列表的指针,因为它们在同一个节点对象中。

            可以通过保持运行总数和运行计数来保持平均值,在删除元素时减去它们,并在创建它们时添加它们,因此不必每次都遍历整个列表来计算平均值。

            正如 btilly 在他们对 Sam Holder 帖子的评论中所建议的那样,使用最大堆和最小堆比使用列表更有效,我们将再次需要使用单个节点,同时为堆和列表,因此我们不必搜索元素来删除它们,并且可能需要花一些时间考虑如何正确删除不在堆顶部的元素,同时保持 O(log n) 插入的保证和删除。

            【讨论】:

              【解决方案11】:

              平均而言,需要考虑 3 种情况:

              1. 您的数字是整数。保持运行总计和计数,添加新的 值到总计,从总计中减去旧值,然后除以 根据需要计数。这很简单,因为您不必 担心精度损失。
              2. 您的数字是浮点数,您需要 0 损失 精度:您必须遍历整个一秒列表以 计算平均值
              3. 你的数字是浮点数,你可以忍受一些损失 精度:按整数平均操作,做一个完整的 每 1000 个左右的值重新计算一次。

              对于最小值和最大值(仅与上面的 #1 和 #3 相关):

              • 将值保存在按值索引的trap 中。
              • 还将值保存在按时间排序的双向链表中。保存开头和结尾 列表。
              • 从列表的开头删除,并添加到列表的末尾 列表。
              • 对于每个新值:将其添加到时间链表的开头。 根据需要从时间链表的末尾删除值。

              当您在链表中添加和删除值时,对treap 执行相应的操作。要从 treap 中获取最小值和最大值,只需在 log(n) 时间内执行 find_minimum 和 find_maximum 操作。当你在恒定时间内从链表的右端删除东西时,也要在 log(n) 时间内从treap 中删除它们。

              Treaps 可以在 log(n) 时间内找到它们的最小值,在 log(n) 时间内找到它们的最大值,并在 log(n) 时间内找到任意值。一般来说,访问数据所需的不同方式越多,像treap 这样全面的数据结构就越好。

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 2021-01-25
                • 2016-11-04
                • 1970-01-01
                • 1970-01-01
                • 2018-07-07
                • 1970-01-01
                • 2014-11-20
                • 1970-01-01
                相关资源
                最近更新 更多