【问题标题】:[OpenCL]nearest neighbour using euclidean distance[OpenCL]使用欧几里得距离的最近邻
【发布时间】:2011-07-19 21:05:33
【问题描述】:

我正在使用 OpenCL 来查找两组 3D 点之间的最近邻。

最近邻:对于数据集中的每个点(x,y,z),我必须找到模型中最近的一个。平方距离 = (Ax-Bx)^2 + (Ay-By)^2 + (Az-Bz)^2

这是我目前所做的:

struct point {
int x;
int y;
int z;
};

__kernel void 
nearest_neighbour(__global struct point *model,
__global struct point *dataset,
__global int *nearest,
const unsigned int model_size)
{
    int g_dataset_id = get_global_id(0);

    int dmin = -1;
    int d, dx, dy, dz;

    for (int i=0; i<model_size; ++i) {
        dx = model[i].x - dataset[g_dataset_id].x;
        dx = dx * dx;

        dy = model[i].y - dataset[g_dataset_id].y;
        dy = dy * dy;

        dz = model[i].z - dataset[g_dataset_id].z;
        dz = dz * dz;

        d = dx + dy + dz;

        if(dmin == -1 || d < dmin)
        {
            nearest[g_dataset_id] = i;
            dmin = d;
        }
    }
}

代码似乎可以工作,但我确信它可以被优化。 我想知道如何利用本地内存使其变得更好。

谢谢

附:我知道还有其他(更好的)方法可以找到最近的邻居,比如 kd-tree,但现在我想做一个简单的方法。

【问题讨论】:

    标签: c++ c opencl nearest-neighbor


    【解决方案1】:

    编译器可能会为您提升这些循环不变量,但要确保它完成,请尝试使用此代码将它们分配给名为 datum_x 的临时变量等等。此外,将 dmin 初始化为MAX_INT 可以避免与-1 进行多余的比较。另一种方法是展开第一个循环迭代(使用i=0)以初始化 dmin。

    int dmin = MAX_INT;
    int d, dx, dy, dz;
    int datum_x, datum_y, datum_z;
    
    datum_x = dataset[g_model_id].x;
    datum_y = dataset[g_model_id].y;
    datum_z = dataset[g_model_id].z;
    
    for (int i=0; i<size_dataset; ++i) {
        dx = model[i].x - datum_x;
        dx = dx * dx;
    
        dy = model[i].y - datum_y;
        dy = dy * dy;
    
        dz = model[i].z - datum_z;
        dz = dz * dz;
    
        d = dx + dy + dz;
    
        if(d < dmin)
        {
            nearest[g_dataset_id] = i;
            dmin = d;
        }
    }
    

    【讨论】:

    • 谢谢,实际上您的建议执行时间已从 5.3 秒减少到 3.6 秒(模型:200.000 点和数据集 200.000 点 - 在 ATI 5850 上)
    【解决方案2】:

    也许一个快速的预过滤器可以加快速度。您可以先检查所有三个坐标中的距离是否都小于 dmin,而不是立即计算平方距离。所以,你可以用

    替换你的内部循环
    {
        dx = model[i].x - datum_x;
        if (abs(dx) >= dmin) continue;
    
        dy = model[i].y - datum_y;
        if (abs(dy) >= dmin) continue;
    
        dz = model[i].z - datum_z;
        if (abs(dz) >= dmin) continue;
    
        dx = dx * dx;    
        dy = dy * dy;
        dz = dz * dz;
    
        d = dx + dy + dz;
    
        if(d < dmin)
        {
            nearest[g_dataset_id] = i;
            dmin = d;
        }
    }
    

    我不确定每个点对abs()ifs 的额外调用是否会过滤掉足够多的数据点,但我认为这是一个足够简单的更改来尝试。

    【讨论】:

    • 谢谢,你的建议非常好,执行时间从 3.6s 减少到 2.7s。基于此,我制作了另一个不使用 abs() 的版本(现在为 2.3s):dx = model[i].x - local_x; dx = dx * dx; if (dx &gt;= dmin) continue; dy = model[i].y - local_y; dy = dy * dy; if (dy &gt;= dmin) continue; dz = model[i].z - local_z; dz = dz * dz; if (dz &gt;= dmin) continue; d = dx + dy + dz;
    • 看起来更好。乘法极慢的时代早已过去,我怀疑 abs() 和乘法将花费相当多的时间。
    【解决方案3】:

    我首先想到的是希思提出的建议。每个工作项同时访问内存项model[i]。根据编译器的优化程度,最好让每个工作项访问数组中的不同元素。一种错开方式是:

    int datum_x, datum_y, datum_z;
    
    datum_x = dataset[g_model_id].x;
    datum_y = dataset[g_model_id].y;
    datum_z = dataset[g_model_id].z;
    
    for (int i=0; i<size_dataset; ++i) {
        j = (i + g_model_id) % size_dataset;  // i --> j by cyclic permutation
    
        dx = model[j].x - datum_x;
        dx = dx * x;
    
        dy = model[j].y - datum_y;
        dy = dy * dy;
    
        /* and so on... */
    }
    

    但是,您的代码中对model[i] 的访问很可能被当作“广播”处理,在这种情况下,我的代码会运行得更慢。

    【讨论】:

      【解决方案4】:

      我很确定花费了大量时间将当前最小值写入nearest[g_dataset_id]。访问全局内存通常很慢,因此最好将当前最小值存储在寄存器中,就像使用 dmin = d 一样。

      就像这样:

      ...
      int dmin = MAX_INT;
      int imin = 0;
      ...
      for (...)
      {
        ...
        if(d < dmin)
        {
          imin = i;
          dmin = d;
        }
      }
      
      nearest[g_dataset_id] = imin; //write to global memory only once
      

      【讨论】:

      • 您好,其实时间不是很多,但是性能有所提升,谢谢您的建议
      【解决方案5】:

      Heath 的建议也可以应用于输出索引:维护一个变量nearest_id,并且在循环之后只写一次。

      我会尝试使用 int4 向量,而不是 3 组件结构,并使用向量操作。

      【讨论】:

      • 感谢您的建议,您有关于 int4/float4 向量和向量运算的链接吗?我想看看
      【解决方案6】:

      根据 Eric Bainville 的建议,我试图摆脱点结构。正如我所建议的那样,我使用了 float4,这就是我所做的:

      __kernel void 
      nearest_neighbour(__global float4 *model,
      __global float4 *dataset,
      __global unsigned int *nearest,
      const unsigned int model_size)
      {
          int g_dataset_id = get_global_id(0);
      
          float dmin = MAXFLOAT;
          float d;
      
          /* Ottimizzato per memoria locale */
          float4 local_xyz = dataset[g_dataset_id];
          float4 d_xyz;
          int imin;
      
          for (int i=0; i<model_size; ++i) {
              d_xyz = model[i] - local_xyz;
      
              d_xyz *= d_xyz;
      
              d = d_xyz.x + d_xyz.y + d_xyz.z;
      
              if(d < dmin)
              {
                  imin = i;
                  dmin = d;
              }
          }
      
          nearest[g_dataset_id] = imin; // Write only once in global memory
      }
      

      问题是这个版本比基于点结构的版本运行得慢一些。可能是因为在结构一中我使用了预过滤器:

      dx = model[i].x - local_x;
      dx = dx * dx;
      if (dx >= dmin) continue;
      
      dy = model[i].y - local_y;
      dy = dy * dy;
      if (dy >= dmin) continue;
      
      dz = model[i].z - local_z;
      dz = dz * dz;
      if (dz >= dmin) continue;
      
      d = dx + dy + dz;
      

      我不能使用预过滤器宽度 float4 版本。 您认为我可以对 float4 版本进行其他优化吗?

      感谢大家的宝贵建议

      【讨论】:

      • 如果确保数据集中第 4 个分量为 0,则可以使用d = length(model[i] - local_xyz)d = dot(d_xyz,d_xyz)。摆脱 if 可能会加速循环:imin=(d&lt;dmin)?i:imindmin=min(d,dmin)。正如您在问题中所建议的那样,您的下一步将是使用本地内存。
      【解决方案7】:

      针对您的具体问题“我想知道如何利用本地内存使其变得更好。”

      使用 GPU 的本地内存可能很棘手。在处理它之前,您需要花一些时间阅读 SDK 的代码示例和编程指南。

      基本上,您使用本地内存来缓存一些全局数据块——在你的例子中是 model[] 数组——这样你可以从那里读取它比从全局读取它更快。如果你确实想尝试一下,它会变成这样的伪代码:

      For each block of the model array {
          1) Read data from __global and write it to __local
          2) Barrier
          3) For each model datum in the __local cache,
             Read it and process it.
          4) Barrier
      }
      

      第 3 步基本上就是您现在的循环,只是它只会处理模型数据的一部分而不是整个数据。

      当您使用本地内存时,第 2 步和第 4 步是绝对必要的。您必须同步工作组中的所有主题。屏障强制所有工作项完成屏障之前的代码,然后才能允许它们中的任何一个继续执行屏障之后的代码。这可以防止工作项在其他线程将数据写入本地内存之前从本地内存中读取数据。我不记得屏障指令的语法,但它们在 OpenCL 文档中。

      第 1 步,您将让每个工作项从全局读取不同的数据并将其写入本地缓存。

      类似这样的东西(注意这过于简单且未经测试!):

      __local float4 modelcache[CACHESIZE];
      int me = get_local_id(0);
      
      for (int j = 0; j < model_size; j += CACHESIZE) {
          modelcache[me] = dataset[j+me];
          barrier(CLK_LOCAL_MEM_FENCE);
          for (int i=0; i < CACHESIZE; ++i) {
              d_xyz = modelcache[i] - local_xyz;
              ... etc.
          }
          barrier(CLK_LOCAL_MEM_FENCE);
      }
      

      那么设计问题是:本地缓存应该有多大?工作组的规模是多少?

      本地数据存储在工作组中的工作项之间共享。如果您的 ND 工作项数组并行执行多个工作组,则每个工作组都有自己的模型缓存副本。

      如果您将本地数据数组设置得太小,则使用它们会获得很少或没有任何好处。如果你把它们设置得太大,那么 GPU 就不能并行执行尽可能多的工作组,而且你实际上可能会运行得相当慢。

      最后,我不得不说这个特定的算法不太可能从本地内存缓存中受益。在您的程序中,所有工作项都在同时读取相同的 model[i] 位置,并且大多数 GPU 都具有专门优化的硬件以实现该速度。

      【讨论】:

      • 感谢您的示例,这对于更好地了解本地内存非常有用,只有一件事不清楚,在第 3 步中,我将最接近的保存为:imin = i;但是现在我不能用 i 了,怎么才能得到当前最好的“全局 id”呢?我试过 imin=j+i 但它只适用于前 N 个点。
      猜你喜欢
      • 1970-01-01
      • 2021-08-30
      • 2021-10-01
      • 2013-03-02
      • 2015-07-15
      • 2014-02-04
      • 1970-01-01
      相关资源
      最近更新 更多