警告:如果这是您计划在 GPU 上执行的唯一操作,我不会推荐它。将数据复制到 GPU 或从 GPU 复制数据的成本可能会超过使用 GPU 带来的任何可能的效率/性能优势。
编辑:基于序列阈值可能远大于 2 的 cmets,我将建议一种替代方法(方法 2),它应该比 for 循环更有效或蛮力方法(方法1)。
一般来说,我会将这个问题放在一个名为stream compaction 的类别中。流压缩通常是指获取一个数据序列并将其缩减为更小的数据序列。
如果我们查看推力流压缩区域,可以使用thrust::copy_if() 来解决这个问题的算法(特别是为了方便,采用模板数组的版本)。
方法一:
要并行考虑这个问题,我们必须问自己在什么条件下应该将给定元素从输入复制到输出?如果我们可以将这个逻辑形式化,我们可以构造一个推力函子,我们可以将其传递给thrust::copy_if 来指示它要复制哪些元素。
对于给定的元素,对于序列长度=2的情况,如果我们知道,我们可以构造一个完整的逻辑:
- 元素
- 元素右边一个位置
- 左边一位的元素
- 左边两个位置的元素
基于上述内容,我们需要为那些未定义上述 2、3 或 4 项的元素提出“特殊情况”逻辑。
忽略特殊情况,如果我们知道以上4项,那么我们可以构造如下必要的逻辑:
如果我左边的元素和我一样,但是左边两个地方的元素不同,那么我属于输出中
如果我左边的元素和我不同,但我右边的元素和我一样,我属于输出
否则,我不属于输出
我会让你为特殊情况构建必要的逻辑。 (或者根据我提供的代码对其进行逆向工程)。
方法二:
对于长序列,方法 1 或方法 1 中逻辑的 for 循环变体将针对序列长度的每个元素生成至少 1 次数据集读取。对于长序列(例如 2000 年),这将是低效的。因此,另一种可能的方法如下:
-
在正向和反向生成exclusive_scan_by_key,使用 ID 值作为键,thrust::constant_iterator(值=1)作为扫描值。对于给定的数据集,会产生如下中间结果:
ID: 1 2 2 3 3 3
VN: 6 7 8 5 7 8
FS: 0 0 1 0 1 2
RS: 0 1 0 2 1 0
其中 FS 和 RS 是正向和反向按键扫描的结果。我们使用.rbegin() 和.rend() reverse iterators 生成反向扫描(RS)。请注意,为了生成上述 RS 序列,必须对反向扫描输入和输出都执行此操作。
-
thrust::copy_if 函子的逻辑就变得相当简单了。对于给定元素,如果该元素的 RS 和 FS 值的 sum 大于或等于所需的最小序列长度(-1 表示独占扫描操作)并且 FS 值小于所需的最小序列长度,则该元素属于输出。
这是两个方法的完整示例,使用给定的数据,序列长度为 2:
$ cat t1095.cu
#include <thrust/device_vector.h>
#include <thrust/copy.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <iostream>
#include <thrust/scan.h>
#include <thrust/iterator/constant_iterator.h>
struct copy_func
{
int *d;
int dsize, r, l, m, l2;
copy_func(int *_d, int _dsize) : d(_d),dsize(_dsize) {};
__host__ __device__
bool operator()(int idx)
{
m = d[idx];
// handle typical case
// this logic could be replaced by a for-loop for sequences of arbitrary length
if ((idx > 1) && (idx < dsize-1)){
r = d[idx+1];
l = d[idx-1];
l2 = d[idx-2];
if ((r == m) && (m != l)) return true;
if ((l == m) && (m != l2)) return true;
return false;}
// handle special cases
if (idx == 0){
r = d[idx+1];
return (r == m);}
if (idx == 1){
r = d[idx+1];
l = d[idx-1];
if (l == m) return true;
else if (r == m) return true;
return false;}
if (idx == dsize-1){
l = d[idx-1];
l2 = d[idx-2];
if ((m == l) && (m != l2)) return true;
return false;}
// could put assert(0) here, should never get here
return false;
}
};
struct copy_func2
{
int thresh;
copy_func2(int _thresh) : thresh(_thresh) {};
template <typename T>
__host__ __device__
bool operator()(T t){
return (((thrust::get<0>(t) + thrust::get<1>(t))>=(thresh-1)) && (thrust::get<0>(t) < thresh));
}
};
int main(){
const int length_threshold = 2;
int ID[] = {1,2,2,3,3,3};
int VN[] = {6,7,8,5,7,8};
int dsize = sizeof(ID)/sizeof(int);
// we assume dsize > 3
thrust::device_vector<int> id(ID, ID+dsize);
thrust::device_vector<int> vn(VN, VN+dsize);
thrust::device_vector<int> res_id(dsize);
thrust::device_vector<int> res_vn(dsize);
thrust::counting_iterator<int> idx(0);
//method 1: sequence length threshold of 2
int rsize = thrust::copy_if(thrust::make_zip_iterator(thrust::make_tuple(id.begin(), vn.begin())), thrust::make_zip_iterator(thrust::make_tuple(id.end(), vn.end())), idx, thrust::make_zip_iterator(thrust::make_tuple(res_id.begin(), res_vn.begin())), copy_func(thrust::raw_pointer_cast(id.data()), dsize)) - thrust::make_zip_iterator(thrust::make_tuple(res_id.begin(), res_vn.begin()));
std::cout << "ID: ";
thrust::copy_n(res_id.begin(), rsize, std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl << "VN: ";
thrust::copy_n(res_vn.begin(), rsize, std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
//method 2: for arbitrary sequence length threshold
thrust::device_vector<int> res_fs(dsize);
thrust::device_vector<int> res_rs(dsize);
thrust::exclusive_scan_by_key(id.begin(), id.end(), thrust::constant_iterator<int>(1), res_fs.begin());
thrust::exclusive_scan_by_key(id.rbegin(), id.rend(), thrust::constant_iterator<int>(1), res_rs.begin());
rsize = thrust::copy_if(thrust::make_zip_iterator(thrust::make_tuple(id.begin(), vn.begin())), thrust::make_zip_iterator(thrust::make_tuple(id.end(), vn.end())), thrust::make_zip_iterator(thrust::make_tuple(res_fs.begin(), res_rs.rbegin())), thrust::make_zip_iterator(thrust::make_tuple(res_id.begin(), res_vn.begin())), copy_func2(length_threshold)) - thrust::make_zip_iterator(thrust::make_tuple(res_id.begin(), res_vn.begin()));
std::cout << "ID: ";
thrust::copy_n(res_id.begin(), rsize, std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl << "VN: ";
thrust::copy_n(res_vn.begin(), rsize, std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
return 0;
}
$ nvcc -o t1095 t1095.cu
$ ./t1095
ID: 2 2 3 3
VN: 7 8 5 7
ID: 2 2 3 3
VN: 7 8 5 7
注意事项:
-
copy_func 为方法 1 实现给定元素的测试逻辑。它接收该元素的索引(通过模板)以及指向设备上 ID 数据的指针,并且数据的大小,通过函子初始化参数。变量r、m、l和l2分别指的是我右边的元素、我自己、我左边的元素和我左边两个地方的元素。
我们将指向ID 数据的指针传递给函子。这允许函子为测试逻辑检索(最多)4 个必要元素。这避免了一个推力::zip_iterator 的混乱构造来提供所有这些值。请注意,仿函数中这些元素的读取应该很好地合并,因此相当有效,并且还可以从缓存中受益。
我并不声称这是没有缺陷的。我认为我的测试逻辑是正确的,但有可能我没有。至少,您应该验证该部分代码的逻辑正确性。我的目的不是给你一段黑盒代码,而是演示如何思考问题。
对于长于 2 的键序列,这种方法可能会变得很麻烦。在这种情况下,我建议使用方法 2。(如果您已经有一个实现必要逻辑的顺序 for 循环,则可以删除将其修改为方法 1 仿函数,以获得更长的键序列。这样的 for 循环可能仍应受益于缓存中的合并访问和相邻访问。)