【问题标题】:Help with algorithm for merging vectors帮助合并向量的算法
【发布时间】:2010-09-10 12:05:58
【问题描述】:

我需要一个非常快速的算法来完成以下任务。我已经实现了几种完成它的算法,但是它们对于我需要的性能来说都太慢了。它应该足够快,以至于算法可以在现代 CPU 上每秒至少运行 100,000 次。它将在 C++ 中实现。

我正在使用跨度/范围,一种在一条线上具有起点和终点坐标的结构。

我有两个跨度向量(动态数组),我需要合并它们。一个向量是 src,另一个是 dst。向量按跨度起始坐标排序,跨度在一个向量内不重叠。

src 向量中的 span 必须与 dst 向量中的 span 合并,这样得到的向量仍然是有序的并且没有重叠。 IE。如果在合并期间检测到重叠,则将两个跨度合并为一个。 (合并两个跨度只是改变结构中的坐标。)

现在,还有一个问题,src 向量中的跨度必须在合并期间“加宽”。这意味着将在 src 中每个跨度的开始坐标添加一个常量,并将另一个(更大的)常量添加到结束坐标。这意味着在 src 跨度扩大后,它们可能会重叠。


到目前为止,我得出的结论是它不能完全就地完成,需要某种临时存储。我认为它应该在线性时间内超过 src 和 dst 的总和。

任何临时存储都可能在算法的多次运行之间共享。

我尝试过的两种主要方法都太慢了,它们是:

  1. 将 src 的所有元素追加到 dst,在追加之前加宽每个元素。然后运行就地排序。最后,使用“读”和“写”指针遍历结果向量,读指针在写指针之前运行,在它们进行时合并跨度。当所有元素都被合并(读指针到达末尾)dst 被截断。

  2. 创建一个临时工作向量。通过重复从 src 或 dst 中选择下一个元素并合并到工作向量中来进行如上所述的简单合并。完成后,将工作向量复制到 dst 并替换它。

第一种方法的问题是排序是 O((m+n)*log(m+n)) 而不是 O(m+n) 并且有一些开销。这也意味着 dst 向量必须比它真正需要的大得多。

第二个主要问题是大量复制并再次分配/释放内存。

如果您认为需要,可以更改用于存储/管理跨度/向量的数据结构。

更新:忘了说数据集有多大。最常见的情况是任一向量中有 4 到 30 个元素,并且 dst 为空或 src 和 dst 中的 span 之间存在大量重叠。

【问题讨论】:

    标签: c++ algorithm optimization graphics vector


    【解决方案1】:

    不重复分配的第二种方法怎么样 - 换句话说,分配你的临时向量一次,不再分配它?或者,如果输入向量足够小(但不是恒定大小),只需使用 alloca 而不是 malloc。

    此外,在速度方面,您可能需要确保您的代码使用 CMOV 进行排序,因为如果代码实际上是为合并排序的每个迭代进行分支:

    if(src1[x] < src2[x])
        dst[x] = src1[x];
    else
        dst[x] = src2[x];
    

    分支预测将在 50% 的情况下失败,这将对性能产生巨大影响。有条件的移动可能会做得更好,因此请确保编译器正在这样做,如果没有,请尝试诱使它这样做。

    【讨论】:

      【解决方案2】:

      您在方法 1 中提到的排序可以减少为线性时间(从您描述的对数线性),因为两个输入列表已经排序。只需执行合并排序的合并步骤。通过输入跨度向量的适当表示(例如单链表),这可以就地完成。

      http://en.wikipedia.org/wiki/Merge_sort

      【讨论】:

        【解决方案3】:

        我认为严格的线性解决方案是不可能的,因为在最坏的情况下,扩大 src 向量跨度可能会导致它们全部重叠(取决于您要添加的常数的大小)

        问题可能出在实现上,而不是算法上;我建议为您之前的解决方案分析代码,以查看时间花在哪里

        推理:

        对于像英特尔酷睿 2 Extreme QX9770 这样运行在 3.2GHz 的真正“现代”CPU,可以预期大约 59,455 MIPS

        对于 100,000 个向量,您必须在 594,550 条指令中处理每个向量。有很多说明。

        参考:wikipedia MIPS

        另外,请注意,给 src 向量 spans 添加一个常数不会对它们进行反排序,因此您可以独立地对 src 向量 spans 进行归一化,然后将它们与 dst 向量 spans 合并;这应该会减少您原始算法的工作量

        【讨论】:

        • 100k 实际上可能是一个下冲,我还没有真正计算出这个数字。此外,当我说“现代”CPU 时,我实际上是在想“5 岁以内的东西”,Athlon XP 3000+ 并不是一个不切实际的目标。
        【解决方案4】:

        我们知道绝对最佳情况的运行时间是 O(m+n),这是因为您至少必须扫描所有数据才能合并列表。鉴于此,您的第二种方法应该为您提供这种类型的行为。

        您是否分析过第二种方法以找出瓶颈所在?很有可能,根据您所谈论的数据量,实际上不可能在指定的时间内完成您想做的事情。验证这一点的一种方法是做一些简单的事情,比如在一个循环中总结每个向量中跨度的所有开始和结束值,然后计算时间。基本上,您在这里为向量中的每个元素做最少的工作。这将为您提供一个基准,以实现您期望获得的最佳性能。

        除此之外,您可以通过使用stl swap方法避免逐个元素地复制向量,并且您可以将临时向量预先分配到一定的大小,以避免在合并元素时触发数组的扩展。

        您可能会考虑在系统中使用 2 个向量,并且每当您需要进行合并时,您会合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样您就不必在每次进行合并时重新分配向量。

        但是,您最好先进行分析并找出您的瓶颈所在。如果与实际合并过程相比分配最小,那么您需要弄清楚如何使其更快。

        一些可能的额外加速可能来自直接访问向量原始数据,这避免了每次访问数据时的边界检查。

        【讨论】:

        • 感谢您提醒我 std::swap,它实际上可能会破坏交易。测试完我会回来的;)
        【解决方案5】:

        1 是正确的 - 完全排序比合并两个排序列表要慢。

        所以你正在考虑调整 2(或全新的东西)。

        如果将数据结构更改为双向链表,则可以将它们合并到恒定的工作空间中。

        为列表节点使用固定大小的堆分配器,既可以减少每个节点的内存使用量,又可以提高节点在内存中靠近的机会,从而减少页面未命中。

        您也许可以在网上或您最喜欢的算法书中找到代码来优化链表合并。您需要对其进行自定义,以便在列表合并的同时进行跨度合并。

        要优化合并,首先请注意,对于来自同一侧但没有来自另一侧的值的每次运行,您可以一次将整个运行插入到 dst 列表中,而不是依次插入每个节点.并且您可以在正常的列表操作中每次插入保存一次写入,方法是让末尾“悬空”,因为您知道稍后会对其进行修补。如果您不在应用程序的其他任何地方进行删除,则列表可以是单链接的,这意味着每个节点一次写入。

        至于 10 微秒的运行时间 - 有点取决于 n 和 m...

        【讨论】:

          【解决方案6】:

          如果您最近的实施仍然不够快,您可能最终不得不寻找替代方法。

          你用这个函数的输出做什么?

          【讨论】:

          • 嗯,你错过了一件事,不只是一个“增量”,还有两个。 span的左右加值不同,具体来说右边比左边加值大。
          【解决方案7】:

          我为这个算法编写了一个新的容器类,根据需要量身定制。这也让我有机会围绕我的程序调整其他代码,同时提高了一点速度。

          这比使用 STL 向量的旧实现要快得多,但在其他方面基本相同。但是虽然它更快,但仍然不够快......不幸的是。

          分析不再揭示真正的瓶颈是什么。 MSVC 分析器似乎有时会将“责任”归咎于错误的调用(假设相同的运行分配了大不相同的运行时间)并且大多数调用都被合并为一个大裂缝。

          查看生成的代码的反汇编显示,生成的代码中有大量的跳转,我认为这可能是现在缓慢的主要原因。

          class SpanBuffer {
          private:
              int *data;
              size_t allocated_size;
              size_t count;
          
              inline void EnsureSpace()
              {
                  if (count == allocated_size)
                      Reserve(count*2);
              }
          
          public:
              struct Span {
                  int start, end;
              };
          
          public:
              SpanBuffer()
                  : data(0)
                  , allocated_size(24)
                  , count(0)
              {
                  data = new int[allocated_size];
              }
          
              SpanBuffer(const SpanBuffer &src)
                  : data(0)
                  , allocated_size(src.allocated_size)
                  , count(src.count)
              {
                  data = new int[allocated_size];
                  memcpy(data, src.data, sizeof(int)*count);
              }
          
              ~SpanBuffer()
              {
                  delete [] data;
              }
          
              inline void AddIntersection(int x)
              {
                  EnsureSpace();
                  data[count++] = x;
              }
          
              inline void AddSpan(int s, int e)
              {
                  assert((count & 1) == 0);
                  assert(s >= 0);
                  assert(e >= 0);
                  EnsureSpace();
                  data[count] = s;
                  data[count+1] = e;
                  count += 2;
              }
          
              inline void Clear()
              {
                  count = 0;
              }
          
              inline size_t GetCount() const
              {
                  return count;
              }
          
              inline int GetIntersection(size_t i) const
              {
                  return data[i];
              }
          
              inline const Span * GetSpanIteratorBegin() const
              {
                  assert((count & 1) == 0);
                  return reinterpret_cast<const Span *>(data);
              }
          
              inline Span * GetSpanIteratorBegin()
              {
                  assert((count & 1) == 0);
                  return reinterpret_cast<Span *>(data);
              }
          
              inline const Span * GetSpanIteratorEnd() const
              {
                  assert((count & 1) == 0);
                  return reinterpret_cast<const Span *>(data+count);
              }
          
              inline Span * GetSpanIteratorEnd()
              {
                  assert((count & 1) == 0);
                  return reinterpret_cast<Span *>(data+count);
              }
          
              inline void MergeOrAddSpan(int s, int e)
              {
                  assert((count & 1) == 0);
                  assert(s >= 0);
                  assert(e >= 0);
          
                  if (count == 0)
                  {
                      AddSpan(s, e);
                      return;
                  }
          
                  int *lastspan = data + count-2;
          
                  if (s > lastspan[1])
                  {
                      AddSpan(s, e);
                  }
                  else
                  {
                      if (s < lastspan[0])
                          lastspan[0] = s;
                      if (e > lastspan[1])
                          lastspan[1] = e;
                  }
              }
          
              inline void Reserve(size_t minsize)
              {
                  if (minsize <= allocated_size)
                      return;
          
                  int *newdata = new int[minsize];
          
                  memcpy(newdata, data, sizeof(int)*count);
          
                  delete [] data;
                  data = newdata;
          
                  allocated_size = minsize;
              }
          
              inline void SortIntersections()
              {
                  assert((count & 1) == 0);
                  std::sort(data, data+count, std::less<int>());
                  assert((count & 1) == 0);
              }
          
              inline void Swap(SpanBuffer &other)
              {
                  std::swap(data, other.data);
                  std::swap(allocated_size, other.allocated_size);
                  std::swap(count, other.count);
              }
          };
          
          
          struct ShapeWidener {
              // How much to widen in the X direction
              int widen_by;
              // Half of width difference of src and dst (width of the border being produced)
              int xofs;
          
              // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
              SpanBuffer buffer;
          
              inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);
          
              ShapeWidener(int _xofs) : xofs(_xofs) { }
          };
          
          
          inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
          {
              if (src.GetCount() == 0) return;
              if (src.GetCount() + dst.GetCount() == 0) return;
          
              assert((src.GetCount() & 1) == 0);
              assert((dst.GetCount() & 1) == 0);
          
              assert(buffer.GetCount() == 0);
          
              dst.Swap(buffer);
          
              const int widen_s = xofs - widen_by;
              const int widen_e = xofs + widen_by;
          
              size_t resta = src.GetCount()/2;
              size_t restb = buffer.GetCount()/2;
              const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
              const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();
          
              while (resta > 0 || restb > 0)
              {
                  if (restb == 0)
                  {
                      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
                      --resta, ++spa;
                  }
                  else if (resta == 0)
                  {
                      dst.MergeOrAddSpan(spb->start, spb->end);
                      --restb, ++spb;
                  }
                  else if (spa->start < spb->start)
                  {
                      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
                      --resta, ++spa;
                  }
                  else
                  {
                      dst.MergeOrAddSpan(spb->start, spb->end);
                      --restb, ++spb;
                  }
              }
          
              buffer.Clear();
          }
          

          【讨论】:

          • 尝试使用双端队列而不是向量。 Deque 分配的内存更少,但代价是不是连续的内存。
          【解决方案8】:

          我会始终保持我的跨度向量排序。这使得算法的实现变得更加容易——并且可以在线性时间内完成。

          好的,所以我会根据以下条件对跨度进行排序:

          • 按升序排列的最小跨度
          • 然后按降序排列最大跨度

          您需要创建一个函数来执行此操作。

          然后我会使用 std::set_union 来合并向量(您可以在继续之前合并多个)。

          然后对于具有相同最小值的每组连续跨度,您保留第一个并删除其余的(它们是第一个跨度的子跨度)。

          然后你需要合并你的跨度。现在这应该是相当可行的,并且在线性时间内是可行的。

          好的,这就是诀窍。不要尝试就地执行此操作。使用一个或多个临时向量(并提前预留足够的空间)。然后在最后,调用 std::vector::swap 将结果放入您选择的输入向量中。

          我希望这足以让你继续前进。

          【讨论】:

            【解决方案9】:

            您的目标系统是什么?是多核的吗?如果是这样你可以考虑多线程这个算法

            【讨论】:

            • 我的目标系统是过去 5 年左右的桌面系统,我不能假设任何关于 SMP 支持或 SIMD 指令集的事情。 (好吧,我可以假设 x86 上的 MMX,但仅此而已。)
            猜你喜欢
            • 2019-02-04
            • 2021-03-22
            • 1970-01-01
            • 1970-01-01
            • 2010-12-04
            • 2011-03-27
            • 1970-01-01
            • 1970-01-01
            • 2015-07-12
            相关资源
            最近更新 更多