【问题标题】:What's the algorithm for 2:1 balancing a linear octree?2:1 平衡线性八叉树的算法是什么?
【发布时间】:2014-08-20 05:45:44
【问题描述】:

我有一个自上而下的过程,它从 3D 对象的高级描述构建线性八叉树(例如,叶子排列成数组并按莫顿编码排序)。问题是,对于我的预期应用,生成的八叉树必须是 2:1 平衡的,即不能有任何一对相邻块,其中一个块的大小超过另一个块的两倍。

我唯一能找到的是文章“Bottom Up Construction and 2:1 Balance Refinement of Linear Octrees in Parallel”(您从多个来源找到它但版权不清楚,不知道链接事物的政策是什么就像这个网站上的那样),它解释了一个算法来做到这一点。问题是所提出的算法在并行消息传递架构中工作,这对我的应用程序来说太过分了。另一个问题是(自下而上的)构造和平衡算法似乎捆绑在一起,我不知道如何仅在用我自己的方法构造树之后才能平衡它。

那么 2:1 平衡线性八叉树的(希望是简单且)有效的方法是什么?并行算法也很好,但使用共享内存模型,而不是像链接算法那样的消息传递。

【问题讨论】:

    标签: algorithm octree


    【解决方案1】:

    最简单的顺序算法可能是保留一个包含未处理八叉树节点的队列,并通过确保其三个级别 1 的非兄弟邻居存在来依次处理每个八叉树节点。在处理过程中创建的附加节点进入队列。

    -----------------
    | | | | |       |
    ---------       |
    | | | | |       |
    ---------       |
    | | |d| |       |
    --b------       |
    | | |a|e|       |
    -----------------
    |       |       |
    |       |       |
    |       |       |
    |   c   |       |
    |       |       |
    |       |       |
    |       |       |
    -----------------
    

    这里 a 的两个(四叉树)级别 - 1 个非兄弟邻居是 b 和一个尚不存在的 c 的孩子。节点 d 和 e 是(同一级别的)兄弟邻居。

    该算法的复杂部分是确定如何找到这些邻居。这可以通过计算节点及其非兄弟邻居的坐标,Morton 对其进行编码,然后依次在该节点及其每个邻居的编码的 XOR 中寻找最重要的设置位来完成。这个位超过三加一的索引就是需要遍历的父链接数。

    例如,让我们使用 yxyxyx 作为四叉树图的 Morton 交错。节点 a 是从 (2,4) 到 (3,5) 的正方形,莫顿坐标为 100100。节点 b 是从 (0,4) 到 (2,6) 的正方形,莫顿坐标为 100000。节点 c 是从 (0,0) 到 (4,4) 的正方形,莫顿坐标为 000000。节点 d 是从 (2,5) 到 (3,6) 的正方形,莫顿坐标为 100110。节点 e 是 (3) 的正方形,4) 到 (4,5),莫顿坐标为 100101。

    为了找到 a 的邻居,我们对 (2+1, 4) 和 (2-1, 4) 和 (2, 4+1) 和 (2, 4-1) 进行编码。让我们做 (2, 4-1) = (2,3)。 (2,3)的Morton编码为001110。与a的Morton编码相比,

        001110
        100100
    XOR ------
        101010
        \/\/\/
    

    由于两位的第三低有效组是非零的,因此我们根据 Morton 代码上升到叶级减三(即来自 a 的三个父链接),然后下降两次(三减一)。当我们遇到不存在的孩子时,就像第二个降序步骤一样,我们根据需要制作它们并将它们添加到队列中。

    并行版

    由于我在顺序算法中避免了一些复杂的优化,所以它对处理顺序的变化甚至并行性都是稳健的,只要在分裂八叉树节点时没有竞争。对于共享内存并行性,给定一个比较和交换原语,很容易构造一个无子锁(分配一个节点,然后从 null CAS 适当的子指针;如果 CAS 失败,那么只需读取获胜的孩子)。鉴于 CAS 可用,队列的有效共享集合不应该太难(它不必是 FIFO)。我不知道这里会提供什么样的加速并行性。

    【讨论】:

    • 谢谢,但有几个部分不清楚。您能否澄清有关查找不存在节点的部分?这个 XOR 技巧究竟做了什么?此外,您可以快速添加创建的节点:“我们根据需要制作它们”。鉴于它是一个数组,我如何将它们插入八叉树的线性表示中?如何在没有插入线性开销的情况下添加元素?
    • @gigabytes 抱歉,我假设(错误地,事实证明)您使用的是带有父/子指针的标准八叉树表示。我需要考虑如何处理数组表示。
    • @gigabytes 插入会很慢,除非它们是成批完成的。我建议您将此步骤的表示切换为基于指针的结构,然后使用深度优先遍历恢复排序顺序。 XOR 技巧利用 Morton 编码的每个 d 位组本质上指定一个子索引这一事实;我们正在寻找节点及其邻居的最叶共同祖先,以便我们可以有效地导航到该邻居。在二叉树中寻找节点后继的问题使用了类似的逻辑。
    • 好的。但是在二叉树中,我有一个继任者。在这里我必须找到所有周围的邻居来检查平衡,还是我错过了什么?
    • @gigabytes 要在完整的二叉树中找到下一个(向右)叶节点,您将父指针上升,直到您来自某个节点 u 的左子节点,然后通过 u 的右子节点的左子节点下降子孙。在四叉树/八叉树中,逻辑是提升父指针,直到您来自未位于父正方形/立方体中最右边的孩子,然后类似地下降。
    【解决方案2】:

    您引用的论文是我小组以前的工作。由于您询问了版权,因此可以从我们的群组网页获得 GPL2 下的代码:http://padas.ices.utexas.edu/software

    一个新代码(pvfmm,我正在研究)也可以从上面的链接下载,并且在 GPL3 下可用。这个有一个更简单且非常有效的平衡算法。 算法在本文中描述:http://padas.ices.utexas.edu/static/papers/pvfmm.pdf

    我已修改以下代码以删除分布式内存并行性。 函数balanceOctree实现了算法。

    #include <vector>
    #include <set>
    #include <cstdlib>
    #include <cmath>
    #include <stdint.h>
    #include <omp.h>
    #include <algorithm>
    #include <cstring>
    
    #ifndef MAX_DEPTH
    #define MAX_DEPTH 30
    #endif
    
    #if MAX_DEPTH < 7
    #define UINT_T uint8_t
    #define  INT_T  int8_t
    #elif MAX_DEPTH < 15
    #define UINT_T uint16_t
    #define  INT_T  int16_t
    #elif MAX_DEPTH < 31
    #define UINT_T uint32_t
    #define  INT_T  int32_t
    #elif MAX_DEPTH < 63
    #define UINT_T uint64_t
    #define  INT_T  int64_t
    #endif
    
    
    namespace omp_par{
    
    template <class T,class StrictWeakOrdering>
    void merge(T A_,T A_last,T B_,T B_last,T C_,int p,StrictWeakOrdering comp){
      typedef typename std::iterator_traits<T>::difference_type _DiffType;
      typedef typename std::iterator_traits<T>::value_type _ValType;
    
      _DiffType N1=A_last-A_;
      _DiffType N2=B_last-B_;
      if(N1==0 && N2==0) return;
      if(N1==0 || N2==0){
        _ValType* A=(N1==0? &B_[0]: &A_[0]);
        _DiffType N=(N1==0?  N2  :  N1   );
        #pragma omp parallel for
        for(int i=0;i<p;i++){
          _DiffType indx1=( i   *N)/p;
          _DiffType indx2=((i+1)*N)/p;
          memcpy(&C_[indx1], &A[indx1], (indx2-indx1)*sizeof(_ValType));
        }
        return;
      }
    
      //Split both arrays ( A and B ) into n equal parts.
      //Find the position of each split in the final merged array.
      int n=10;
      _ValType* split=new _ValType[p*n*2];
      _DiffType* split_size=new _DiffType[p*n*2];
      #pragma omp parallel for
      for(int i=0;i<p;i++){
        for(int j=0;j<n;j++){
          int indx=i*n+j;
          _DiffType indx1=(indx*N1)/(p*n);
          split   [indx]=A_[indx1];
          split_size[indx]=indx1+(std::lower_bound(B_,B_last,split[indx],comp)-B_);
    
          indx1=(indx*N2)/(p*n);
          indx+=p*n;
          split   [indx]=B_[indx1];
          split_size[indx]=indx1+(std::lower_bound(A_,A_last,split[indx],comp)-A_);
        }
      }
    
      //Find the closest split position for each thread that will
      //divide the final array equally between the threads.
      _DiffType* split_indx_A=new _DiffType[p+1];
      _DiffType* split_indx_B=new _DiffType[p+1];
      split_indx_A[0]=0;
      split_indx_B[0]=0;
      split_indx_A[p]=N1;
      split_indx_B[p]=N2;
      #pragma omp parallel for
      for(int i=1;i<p;i++){
        _DiffType req_size=(i*(N1+N2))/p;
    
        int j=std::lower_bound(&split_size[0],&split_size[p*n],req_size,std::less<_DiffType>())-&split_size[0];
        if(j>=p*n)
          j=p*n-1;
        _ValType  split1     =split     [j];
        _DiffType split_size1=split_size[j];
    
        j=(std::lower_bound(&split_size[p*n],&split_size[p*n*2],req_size,std::less<_DiffType>())-&split_size[p*n])+p*n;
        if(j>=2*p*n)
          j=2*p*n-1;
        if(abs(split_size[j]-req_size)<abs(split_size1-req_size)){
          split1     =split   [j];
          split_size1=split_size[j];
        }
    
        split_indx_A[i]=std::lower_bound(A_,A_last,split1,comp)-A_;
        split_indx_B[i]=std::lower_bound(B_,B_last,split1,comp)-B_;
      }
      delete[] split;
      delete[] split_size;
    
      //Merge for each thread independently.
      #pragma omp parallel for
      for(int i=0;i<p;i++){
        T C=C_+split_indx_A[i]+split_indx_B[i];
        std::merge(A_+split_indx_A[i],A_+split_indx_A[i+1],B_+split_indx_B[i],B_+split_indx_B[i+1],C,comp);
      }
      delete[] split_indx_A;
      delete[] split_indx_B;
    }
    
    template <class T,class StrictWeakOrdering>
    void merge_sort(T A,T A_last,StrictWeakOrdering comp){
      typedef typename std::iterator_traits<T>::difference_type _DiffType;
      typedef typename std::iterator_traits<T>::value_type _ValType;
    
      int p=omp_get_max_threads();
      _DiffType N=A_last-A;
      if(N<2*p){
        std::sort(A,A_last,comp);
        return;
      }
    
      //Split the array A into p equal parts.
      _DiffType* split=new _DiffType[p+1];
      split[p]=N;
      #pragma omp parallel for
      for(int id=0;id<p;id++){
        split[id]=(id*N)/p;
      }
    
      //Sort each part independently.
      #pragma omp parallel for
      for(int id=0;id<p;id++){
        std::sort(A+split[id],A+split[id+1],comp);
      }
    
      //Merge two parts at a time.
      _ValType* B=new _ValType[N];
      _ValType* A_=&A[0];
      _ValType* B_=&B[0];
      for(int j=1;j<p;j=j*2){
        for(int i=0;i<p;i=i+2*j){
          if(i+j<p){
            merge(A_+split[i],A_+split[i+j],A_+split[i+j],A_+split[(i+2*j<=p?i+2*j:p)],B_+split[i],p,comp);
          }else{
            #pragma omp parallel for
            for(int k=split[i];k<split[p];k++)
              B_[k]=A_[k];
          }
        }
        _ValType* tmp_swap=A_;
        A_=B_;
        B_=tmp_swap;
      }
    
      //The final result should be in A.
      if(A_!=&A[0]){
        #pragma omp parallel for
        for(int i=0;i<N;i++)
          A[i]=A_[i];
      }
    
      //Free memory.
      delete[] split;
      delete[] B;
    }
    
    template <class T>
    void merge_sort(T A,T A_last){
      typedef typename std::iterator_traits<T>::value_type _ValType;
      merge_sort(A,A_last,std::less<_ValType>());
    }
    
    template <class T, class I>
    T reduce(T* A, I cnt){
      T sum=0;
      #pragma omp parallel for reduction(+:sum)
      for(I i = 0; i < cnt; i++)
        sum+=A[i];
      return sum;
    }
    
    template <class T, class I>
    void scan(T* A, T* B,I cnt){
      int p=omp_get_max_threads();
      if(cnt<(I)100*p){
        for(I i=1;i<cnt;i++)
          B[i]=B[i-1]+A[i-1];
        return;
      }
    
      I step_size=cnt/p;
    
      #pragma omp parallel for
      for(int i=0; i<p; i++){
        int start=i*step_size;
        int end=start+step_size;
        if(i==p-1) end=cnt;
        if(i!=0)B[start]=0;
        for(I j=(I)start+1; j<(I)end; j++)
          B[j]=B[j-1]+A[j-1];
      }
    
      T* sum=new T[p];
      sum[0]=0;
      for(int i=1;i<p;i++)
        sum[i]=sum[i-1]+B[i*step_size-1]+A[i*step_size-1];
    
      #pragma omp parallel for
      for(int i=1; i<p; i++){
        int start=i*step_size;
        int end=start+step_size;
        if(i==p-1) end=cnt;
        T sum_=sum[i];
        for(I j=(I)start; j<(I)end; j++)
          B[j]+=sum_;
      }
      delete[] sum;
    }
    
    }
    
    
    class MortonId{
    
     public:
    
      MortonId();
    
      MortonId(MortonId m, unsigned char depth);
    
      template <class T>
      MortonId(T x_f,T y_f, T z_f, unsigned char depth=MAX_DEPTH);
    
      template <class T>
      MortonId(T* coord, unsigned char depth=MAX_DEPTH);
    
      unsigned int GetDepth() const;
    
      template <class T>
      void GetCoord(T* coord);
    
      MortonId NextId() const;
    
      MortonId getAncestor(unsigned char ancestor_level) const;
    
      /**
       * \brief Returns the deepest first descendant.
       */
      MortonId getDFD(unsigned char level=MAX_DEPTH) const;
    
      void NbrList(std::vector<MortonId>& nbrs,unsigned char level, int periodic) const;
    
      std::vector<MortonId> Children() const;
    
      int operator<(const MortonId& m) const;
    
      int operator>(const MortonId& m) const;
    
      int operator==(const MortonId& m) const;
    
      int operator!=(const MortonId& m) const;
    
      int operator<=(const MortonId& m) const;
    
      int operator>=(const MortonId& m) const;
    
      int isAncestor(MortonId const & other) const;
    
     private:
    
      UINT_T x,y,z;
      unsigned char depth;
    
    };
    
    inline MortonId::MortonId():x(0), y(0), z(0), depth(0){}
    
    inline MortonId::MortonId(MortonId m, unsigned char depth_):x(m.x), y(m.y), z(m.z), depth(depth_){}
    
    template <class T>
    inline MortonId::MortonId(T x_f,T y_f, T z_f, unsigned char depth_): depth(depth_){
      UINT_T max_int=((UINT_T)1)<<(MAX_DEPTH);
      x=(UINT_T)floor(x_f*max_int);
      y=(UINT_T)floor(y_f*max_int);
      z=(UINT_T)floor(z_f*max_int);
    }
    
    template <class T>
    inline MortonId::MortonId(T* coord, unsigned char depth_): depth(depth_){
      UINT_T max_int=((UINT_T)1)<<(MAX_DEPTH);
      x=(UINT_T)floor(coord[0]*max_int);
      y=(UINT_T)floor(coord[1]*max_int);
      z=(UINT_T)floor(coord[2]*max_int);
    }
    
    template <class T>
    inline void MortonId::GetCoord(T* coord){
      UINT_T max_int=((UINT_T)1)<<(MAX_DEPTH);
      T s=1.0/((T)max_int);
      coord[0]=x*s;
      coord[1]=y*s;
      coord[2]=z*s;
    }
    
    inline unsigned int MortonId::GetDepth() const{
      return depth;
    }
    
    inline MortonId MortonId::NextId() const{
      MortonId m=*this;
    
      UINT_T mask=((UINT_T)1)<<(MAX_DEPTH-depth);
      int i;
      for(i=depth;i>=0;i--){
        m.x=(m.x^mask);
        if((m.x & mask))
          break;
        m.y=(m.y^mask);
        if((m.y & mask))
          break;
        m.z=(m.z^mask);
        if((m.z & mask))
          break;
        mask=(mask<<1);
      }
      m.depth=i;
      return m;
    }
    
    inline MortonId MortonId::getAncestor(unsigned char ancestor_level) const{
      MortonId m=*this;
      m.depth=ancestor_level;
    
      UINT_T mask=(((UINT_T)1)<<(MAX_DEPTH))-(((UINT_T)1)<<(MAX_DEPTH-ancestor_level));
      m.x=(m.x & mask);
      m.y=(m.y & mask);
      m.z=(m.z & mask);
      return m;
    }
    
    inline MortonId MortonId::getDFD(unsigned char level) const{
      MortonId m=*this;
      m.depth=level;
      return m;
    }
    
    inline int MortonId::operator<(const MortonId& m) const{
      if(x==m.x && y==m.y && z==m.z) return depth<m.depth;
      UINT_T x_=(x^m.x);
      UINT_T y_=(y^m.y);
      UINT_T z_=(z^m.z);
    
      if((z_>x_ || ((z_^x_)<x_ && (z_^x_)<z_)) && (z_>y_ || ((z_^y_)<y_ && (z_^y_)<z_)))
        return z<m.z;
      if(y_>x_ || ((y_^x_)<x_ && (y_^x_)<y_))
        return y<m.y;
      return x<m.x;
    }
    
    inline int MortonId::operator>(const MortonId& m) const{
      if(x==m.x && y==m.y && z==m.z) return depth>m.depth;
      UINT_T x_=(x^m.x);
      UINT_T y_=(y^m.y);
      UINT_T z_=(z^m.z);
    
      if((z_>x_ || ((z_^x_)<x_ && (z_^x_)<z_)) && (z_>y_ || ((z_^y_)<y_ && (z_^y_)<z_)))
        return z>m.z;
      if((y_>x_ || ((y_^x_)<x_ && (y_^x_)<y_)))
        return y>m.y;
      return x>m.x;
    }
    
    inline int MortonId::operator==(const MortonId& m) const{
      return (x==m.x && y==m.y && z==m.z && depth==m.depth);
    }
    
    inline int MortonId::operator!=(const MortonId& m) const{
      return !(*this==m);
    }
    
    inline int MortonId::operator<=(const MortonId& m) const{
      return !(*this>m);
    }
    
    inline int MortonId::operator>=(const MortonId& m) const{
      return !(*this<m);
    }
    
    inline int MortonId::isAncestor(MortonId const & other) const {
      return other.depth>depth && other.getAncestor(depth)==*this;
    }
    
    
    void MortonId::NbrList(std::vector<MortonId>& nbrs, unsigned char level, int periodic) const{
      nbrs.clear();
      static int dim=3;
      static int nbr_cnt=(int)(pow(3.0,dim)+0.5);
      static UINT_T maxCoord=(((UINT_T)1)<<(MAX_DEPTH));
    
      UINT_T mask=maxCoord-(((UINT_T)1)<<(MAX_DEPTH-level));
      UINT_T pX=x & mask;
      UINT_T pY=y & mask;
      UINT_T pZ=z & mask;
    
      MortonId mid_tmp;
      mask=(((UINT_T)1)<<(MAX_DEPTH-level));
      for(int i=0; i<nbr_cnt; i++){
        INT_T dX = ((i/1)%3-1)*mask;
        INT_T dY = ((i/3)%3-1)*mask;
        INT_T dZ = ((i/9)%3-1)*mask;
        INT_T newX=(INT_T)pX+dX;
        INT_T newY=(INT_T)pY+dY;
        INT_T newZ=(INT_T)pZ+dZ;
        if(!periodic){
          if(newX>=0 && newX<(INT_T)maxCoord)
          if(newY>=0 && newY<(INT_T)maxCoord)
          if(newZ>=0 && newZ<(INT_T)maxCoord){
            mid_tmp.x=newX; mid_tmp.y=newY; mid_tmp.z=newZ;
            mid_tmp.depth=level;
            nbrs.push_back(mid_tmp);
          }
        }else{
          if(newX<0) newX+=maxCoord; if(newX>=(INT_T)maxCoord) newX-=maxCoord;
          if(newY<0) newY+=maxCoord; if(newY>=(INT_T)maxCoord) newY-=maxCoord;
          if(newZ<0) newZ+=maxCoord; if(newZ>=(INT_T)maxCoord) newZ-=maxCoord;
          mid_tmp.x=newX; mid_tmp.y=newY; mid_tmp.z=newZ;
          mid_tmp.depth=level;
          nbrs.push_back(mid_tmp);
        }
      }
    }
    
    std::vector<MortonId> MortonId::Children() const{
      static int dim=3;
      static int c_cnt=(1UL<<dim);
      static UINT_T maxCoord=(((UINT_T)1)<<(MAX_DEPTH));
      std::vector<MortonId> child(c_cnt);
    
      UINT_T mask=maxCoord-(((UINT_T)1)<<(MAX_DEPTH-depth));
      UINT_T pX=x & mask;
      UINT_T pY=y & mask;
      UINT_T pZ=z & mask;
    
      mask=(((UINT_T)1)<<(MAX_DEPTH-(depth+1)));
      for(int i=0; i<c_cnt; i++){
        child[i].x=pX+mask*((i/1)%2);
        child[i].y=pY+mask*((i/2)%2);
        child[i].z=pZ+mask*((i/4)%2);
        child[i].depth=depth+1;
      }
      return child;
    }
    
    
    
    //list must be sorted.
    void lineariseList(std::vector<MortonId> & list) {
      if(!list.empty()) {// Remove duplicates and ancestors.
        std::vector<MortonId> tmp;
        for(unsigned int i = 0; i < (list.size()-1); i++) {
          if( (!(list[i].isAncestor(list[i+1]))) && (list[i] != list[i+1]) ) {
            tmp.push_back(list[i]);
          }
        }
        tmp.push_back(list[list.size()-1]);
        list.swap(tmp);
      }
    }
    
    void balanceOctree (std::vector<MortonId > &in, std::vector<MortonId > &out,
        unsigned int maxDepth, bool periodic) {
      unsigned int  dim=3;
    
      int omp_p=omp_get_max_threads();
      if(in.size()==1){
        out=in;
        return 0;
      }
    
      //Build level-by-level set of nodes.
      std::vector<std::set<MortonId> > nodes((maxDepth+1)*omp_p);
      #pragma omp parallel for
      for(int p=0;p<omp_p;p++){
        size_t a=( p   *in.size())/omp_p;
        size_t b=((p+1)*in.size())/omp_p;
        for(size_t i=a;i<b;){
          size_t d=in[i].GetDepth();
          if(d==0){i++; continue;}
          MortonId pnode=in[i].getAncestor(d-1);
          nodes[d-1+(maxDepth+1)*p].insert(pnode);
          while(i<b && d==in[i].GetDepth() && pnode==in[i].getAncestor(d-1)) i++;
        }
    
        //Add new nodes level-by-level.
        std::vector<MortonId> nbrs;
        unsigned int num_chld=1UL<<dim;
        for(unsigned int l=maxDepth;l>=1;l--){
          //Build set of parents of balancing nodes.
          std::set<MortonId> nbrs_parent;
          std::set<MortonId>::iterator start=nodes[l+(maxDepth+1)*p].begin();
          std::set<MortonId>::iterator end  =nodes[l+(maxDepth+1)*p].end();
          for(std::set<MortonId>::iterator node=start; node != end;){
            node->NbrList(nbrs, l, periodic);
            int nbr_cnt=nbrs.size();
            for(int i=0;i<nbr_cnt;i++)
              nbrs_parent.insert(nbrs[i].getAncestor(l-1));
            node++;
          }
          //Get the balancing nodes.
          std::set<MortonId>& ancestor_nodes=nodes[l-1+(maxDepth+1)*p];
          start=nbrs_parent.begin();
          end  =nbrs_parent.end();
          ancestor_nodes.insert(start,end);
        }
    
        //Remove ancestors nodes. (optional)
        for(unsigned int l=1;l<=maxDepth;l++){
          std::set<MortonId>::iterator start=nodes[l  +(maxDepth+1)*p].begin();
          std::set<MortonId>::iterator end  =nodes[l  +(maxDepth+1)*p].end();
          std::set<MortonId>& ancestor_nodes=nodes[l-1+(maxDepth+1)*p];
          for(std::set<MortonId>::iterator node=start; node != end; node++){
            MortonId parent=node->getAncestor(node->GetDepth()-1);
            ancestor_nodes.erase(parent);
          }
        }
      }
    
      //Resize out.
      std::vector<size_t> node_cnt(omp_p,0);
      std::vector<size_t> node_dsp(omp_p,0);
      #pragma omp parallel for
      for(int i=0;i<omp_p;i++){
        for(unsigned int j=0;j<=maxDepth;j++)
          node_cnt[i]+=nodes[j+i*(maxDepth+1)].size();
      }
      omp_par::scan(&node_cnt[0],&node_dsp[0], omp_p);
      out.resize(node_cnt[omp_p-1]+node_dsp[omp_p-1]);
    
      //Copy leaf nodes to out.
      #pragma omp parallel for
      for(int p=0;p<omp_p;p++){
        size_t node_iter=node_dsp[p];
        for(unsigned int l=0;l<=maxDepth;l++){
          std::set<MortonId>::iterator start=nodes[l  +(maxDepth+1)*p].begin();
          std::set<MortonId>::iterator end  =nodes[l  +(maxDepth+1)*p].end();
          for(std::set<MortonId>::iterator node=start; node != end; node++)
            out[node_iter++]=*node;
        }
      }
    
      //Sort, Linearise, Redistribute.
      omp_par::merge_sort(&out[0], &out[out.size()]);
      lineariseList(out);
    
      { // Add children
        MortonId nxt_mid(0,0,0,0);
    
        std::vector<MortonId> out1;
        std::vector<MortonId> children;
        for(size_t i=0;i<out.size();i++){
          while(nxt_mid.getDFD()<out[i]){
            while(nxt_mid.isAncestor(out[i])){
              nxt_mid=nxt_mid.getAncestor(nxt_mid.GetDepth()+1);
            }
            out1.push_back(nxt_mid);
            nxt_mid=nxt_mid.NextId();
          }
    
          children=out[i].Children();
          for(size_t j=0;j<8;j++){
            out1.push_back(children[j]);
          }
          nxt_mid=out[i].NextId();
        }
        while(nxt_mid.GetDepth()>0){
          out1.push_back(nxt_mid);
          nxt_mid=nxt_mid.NextId();
        }
        out.swap(out1);
      }
    }
    

    以上代码针对性能进行了优化,OpenMP 进一步使事情复杂化,但基本思想是这样的:

    1. 逐级构建一组树节点 (MortonIds)。
    2. 从最精细的级别循环到根级别。
      • 在每一层,循环遍历该层的所有树节点,并为其父节点的邻居计算 MortonId。将这些添加到较粗级别的树节点集(确保不添加重复节点)。
    3. 最后,取出所有树节点(来自所有级别),并按其 MortonId 对其进行排序。
    4. 移除祖先节点以获得平衡的线性八叉树。

    【讨论】:

    • 谢谢,这很好!我将阅读这篇论文,但您介意概述一下平衡算法的工作原理吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多