【问题标题】:Processing Shared Work Queue Using CUDA Atomic Operations and Grid Synchronization使用 CUDA 原子操作和网格同步处理共享工作队列
【发布时间】:2021-01-03 20:50:46
【问题描述】:

我正在尝试编写一个内核,其线程迭代地处理工作队列中的项目。我的理解是,我应该能够通过使用原子操作来操作工作队列(即从队列中抓取工作项并将新工作项插入队列),并通过协作组使用网格同步来确保所有线程在同一迭代中(我确保线程块的数量不超过内核的设备容量)。但是,有时我观察到工作项在迭代期间被多次跳过或处理。

以下代码是显示这一点的工作示例。在此示例中,创建了一个大小为input_len 的数组,其中包含工作项0input_len - 1processWorkItems 内核处理这些项目以进行 max_iter 迭代。每个工作项都可以将自己及其上一个和下一个工作项放入工作队列中,但marked 数组用于确保在迭代期间,每个工作项最多被添加到工作队列中一次。最后应该发生的是histogram 中的值总和等于input_len * max_iter,并且histogram 中的任何值都不大于1。但我观察到偶尔会在输出中违反这两个标准,这意味着我没有得到原子操作和/或正确的同步。如果有人能指出我的推理和/或实施中的缺陷,我将不胜感激。我的操作系统是 Ubuntu 18.04,CUDA 版本是 10.1,我在 P100、V100 和 RTX 2080 Ti GPU 上进行了实验,并观察到了类似的行为。

我用于为 RTX 2080 Ti 编译的命令:

nvcc -O3 -o atomicsync atomicsync.cu --gpu-architecture=compute_75 -rdc=true

在 RTX 2080 Ti 上运行的一些输入和输出:

./atomicsync 50 1000 1000
Skipped 0.01% of items. 5 extra item processing.
./atomicsync 500 1000 1000
Skipped 0.00% of items. 6 extra item processing.
./atomicsync 5000 1000 1000
Skipped 0.00% of items. 14 extra item processing.

atomicsync.cu:

#include <stdio.h>
#include <cooperative_groups.h>

#define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
template< typename T >
void check(T result, char const *const func, const char *const file, int const line)
{
    if (result)
    {
        fprintf(stderr, "CUDA error at %s:%d code=%d(%s) \"%s\" \n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
        cudaDeviceReset();
        exit(EXIT_FAILURE);
    }
}

__device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
    int already_marked = atomicExch(&marked[item], 1);
    if(already_marked == 0) {
        int idx = atomicAdd(&queue_size[iter + 1], 1);
        queue[(iter + 1) * input_len + idx] = item;
    }
}

__global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
    auto grid = cooperative_groups::this_grid();

    const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;

    for(int iter = 0; iter < max_iter; ++iter) {
        while(true) {
            // Grab work item to process
            int idx = atomicSub(&queue_size[iter], 1);
            --idx;
            if(idx < 0) {
                break;
            }
            int item = queue[iter * input_len + idx];

            // Keep track of processed work items
             ++histogram[iter * input_len + item];

            // Add previous, self, and next work items to work queue
            if(item > 0) {
                addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
            }
            addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
            if(item + 1 < input_len) {
                addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
            }
        }
        __threadfence_system();
        grid.sync();

        // Reset marked array for next iteration
        for(int i = 0; i < items_per_block; ++i) {
            if(blockIdx.x * items_per_block + i < input_len) {
                marked[blockIdx.x * items_per_block + i] = 0;
            }
        }
        __threadfence_system();
        grid.sync();
    }
}

int main(int argc, char* argv[])
{
    int input_len = atoi(argv[1]);
    int max_iter = atoi(argv[2]);
    int num_blocks = atoi(argv[3]);

    // A histogram to keep track of work items that have been processed in each iteration
    int histogram_host[input_len * max_iter];
    memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
    int *histogram_device;
    checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
    checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));

    // Size of the work queue for each iteration
    int queue_size_host[max_iter + 1];
    queue_size_host[0] = input_len;
    memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
    int *queue_size_device;
    checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));

    // Work queue
    int queue_host[input_len * (max_iter + 1)];
    for(int i = 0; i < input_len; ++i) {
        queue_host[i] = i;
    }
    memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
    int *queue_device;
    checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
    checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));

    // An array used to keep track of work items already added to the work queue to
    // avoid multiple additions of a work item in the same iteration
    int marked_host[input_len];
    memset(marked_host, 0, sizeof(int) * input_len);
    int *marked_device;
    checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
    checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));

    const dim3 threads(1, 1, 1);
    const dim3 blocks(num_blocks, 1, 1);

    processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
    checkCudaErrors(cudaDeviceSynchronize());

    checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));

    int extra = 0;
    double deficit = 0;
    for(int i = 0; i < input_len; ++i) {
        int cnt = 0;
        for(int iter = 0; iter < max_iter; ++iter) {
            if(histogram_host[iter * input_len + i] > 1) {
                ++extra;
            }
            cnt += histogram_host[iter * input_len + i];
        }
        deficit += max_iter - cnt;
    }
    printf("Skipped %.2f%% of items. %d extra item processing.\n", deficit / (input_len * max_iter) * 100, extra);

    checkCudaErrors(cudaFree(histogram_device));
    checkCudaErrors(cudaFree(queue_device));
    checkCudaErrors(cudaFree(queue_size_device));
    checkCudaErrors(cudaFree(marked_device));

    return 0;
}

【问题讨论】:

    标签: cuda gpgpu


    【解决方案1】:

    您可能希望在programming gude 中阅读如何进行协作网格内核启动,或研究任何使用网格同步的 cuda 示例代码(例如 reductionMultiBlockCG,还有其他)。

    你做错了。您无法使用普通的 &lt;&lt;&lt;...&gt;&gt;&gt; 启动语法启动协作网格。因此,没有理由假设您内核中的grid.sync() 工作正常。

    通过在cuda-memcheck 下运行,很容易看到网格同步在您的代码中不起作用。当你这样做时,结果会变得更糟。

    当我修改您的代码以进行适当的合作启动时,我对 Tesla V100 没有任何问题:

    $ cat t1811.cu
    #include <stdio.h>
    #include <cooperative_groups.h>
    
    #define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
    template< typename T >
    void check(T result, char const *const func, const char *const file, int const line)
    {
        if (result)
        {
            fprintf(stderr, "CUDA error at %s:%d code=%d(%s) \"%s\" \n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
            cudaDeviceReset();
            exit(EXIT_FAILURE);
        }
    }
    
    __device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
        int already_marked = atomicExch(&marked[item], 1);
        if(already_marked == 0) {
            int idx = atomicAdd(&queue_size[iter + 1], 1);
            queue[(iter + 1) * input_len + idx] = item;
        }
    }
    
    __global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
        auto grid = cooperative_groups::this_grid();
    
        const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;
    
        for(int iter = 0; iter < max_iter; ++iter) {
            while(true) {
                // Grab work item to process
                int idx = atomicSub(&queue_size[iter], 1);
                --idx;
                if(idx < 0) {
                    break;
                }
                int item = queue[iter * input_len + idx];
    
                // Keep track of processed work items
                 ++histogram[iter * input_len + item];
    
                // Add previous, self, and next work items to work queue
                if(item > 0) {
                    addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
                }
                addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
                if(item + 1 < input_len) {
                    addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
                }
            }
            __threadfence_system();
            grid.sync();
    
            // Reset marked array for next iteration
            for(int i = 0; i < items_per_block; ++i) {
                if(blockIdx.x * items_per_block + i < input_len) {
                    marked[blockIdx.x * items_per_block + i] = 0;
                }
            }
            __threadfence_system();
            grid.sync();
        }
    }
    
    int main(int argc, char* argv[])
    {
        int input_len = atoi(argv[1]);
        int max_iter = atoi(argv[2]);
        int num_blocks = atoi(argv[3]);
    
        // A histogram to keep track of work items that have been processed in each iteration
        int *histogram_host = new int[input_len * max_iter];
        memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
        int *histogram_device;
        checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
        checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));
    
        // Size of the work queue for each iteration
        int queue_size_host[max_iter + 1];
        queue_size_host[0] = input_len;
        memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
        int *queue_size_device;
        checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
        checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));
    
        // Work queue
        int *queue_host = new int[input_len * (max_iter + 1)];
        for(int i = 0; i < input_len; ++i) {
            queue_host[i] = i;
        }
        memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
        int *queue_device;
        checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
        checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));
    
        // An array used to keep track of work items already added to the work queue to
        // avoid multiple additions of a work item in the same iteration
        int marked_host[input_len];
        memset(marked_host, 0, sizeof(int) * input_len);
        int *marked_device;
        checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
        checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));
    
        const dim3 threads(1, 1, 1);
        const dim3 blocks(num_blocks, 1, 1);
        int dev = 0;
        int supportsCoopLaunch = 0;
        checkCudaErrors(cudaDeviceGetAttribute(&supportsCoopLaunch, cudaDevAttrCooperativeLaunch, dev));
        if (!supportsCoopLaunch) {printf("Cooperative Launch is not supported on this machine configuration.  Exiting."); return 0;}
        /// This will launch a grid that can maximally fill the GPU, on the default stream with kernel arguments
        int numBlocksPerSm = 0;
        // Number of threads my_kernel will be launched with
        int numThreads = threads.x;
        cudaDeviceProp deviceProp;
        checkCudaErrors(cudaGetDeviceProperties(&deviceProp, dev));
        checkCudaErrors(cudaOccupancyMaxActiveBlocksPerMultiprocessor(&numBlocksPerSm, processWorkItems, numThreads, 0));
        // launch
        void *kernelArgs[] = { &input_len, &max_iter, &histogram_device, &queue_device, &queue_size_device, &marked_device};
        dim3 dimBlock = dim3(numThreads,1,1);
        num_blocks = min(num_blocks, deviceProp.multiProcessorCount*numBlocksPerSm);
        dim3 dimGrid(num_blocks, 1, 1);
        printf("launching %d blocks\n", dimGrid.x);
        checkCudaErrors(cudaLaunchCooperativeKernel((void*)processWorkItems, dimGrid, dimBlock, kernelArgs));
    
        // processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
        checkCudaErrors(cudaDeviceSynchronize());
    
        checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));
    
        int extra = 0;
        double deficit = 0;
        for(int i = 0; i < input_len; ++i) {
            int cnt = 0;
            for(int iter = 0; iter < max_iter; ++iter) {
                if(histogram_host[iter * input_len + i] > 1) {
                    ++extra;
                }
                cnt += histogram_host[iter * input_len + i];
            }
            deficit += max_iter - cnt;
        }
        printf("Skipped %.2f%% of items. %d extra item processing.\n", deficit / (input_len * max_iter) * 100, extra);
    
        checkCudaErrors(cudaFree(histogram_device));
        checkCudaErrors(cudaFree(queue_device));
        checkCudaErrors(cudaFree(queue_size_device));
        checkCudaErrors(cudaFree(marked_device));
    
        return 0;
    }
    $ nvcc -o t1811 t1811.cu -arch=sm_70 -std=c++11 -rdc=true
    $ cuda-memcheck ./t1811 50 1000 5000
    ========= CUDA-MEMCHECK
    launching 2560 blocks
    Skipped 0.00% of items. 0 extra item processing.
    ========= ERROR SUMMARY: 0 errors
    $ cuda-memcheck ./t1811 50 1000 1000
    ========= CUDA-MEMCHECK
    launching 1000 blocks
    Skipped 0.00% of items. 0 extra item processing.
    ========= ERROR SUMMARY: 0 errors
    $ ./t1811 50 1000 5000
    launching 2560 blocks
    Skipped 0.00% of items. 0 extra item processing.
    $ ./t1811 50 1000 1000
    launching 1000 blocks
    Skipped 0.00% of items. 0 extra item processing.
    $ ./t1811 50 1000 1000
    launching 1000 blocks
    Skipped 0.00% of items. 0 extra item processing.
    $
    

    我并不是说上述代码没有缺陷或适用于任何特定目的。它主要是您的代码。我已经对其进行了修改,只是为了演示所提到的概念。

    顺便说一句,我将一些基于堆栈的大型内存分配更改为基于堆。我不建议尝试创建像这样的大型基于堆栈的数组:

    int histogram_host[input_len * max_iter];
    

    我认为这样做更好:

    int *histogram_host = new int[input_len * max_iter];
    

    随着您输入的命令行参数变大,这可能会成为一个问题,具体取决于机器特性。然而,这与 CUDA 没有太大关系。我没有尝试在您的代码中解决此模式的每个实例。

    虽然与此特定问题无关,但网格同步还有其他成功使用的要求。这些都包含在编程指南中,可能包括但不限于:

    • 平台支持(例如操作系统、GPU 等)
    • 内核大小要求(启动的线程或线程块总数)

    编程指南包含方便的样板代码,可用于满足这些要求。

    【讨论】:

    • 感谢您的回复,确实解决了问题。如果我可以问,你有什么特别建议改变的吗?需要明确的是,我不打算要求您修复我的代码,但任何一般性建议都值得赞赏。谢谢。
    • 抱歉,在我发表评论之前没有看到您的编辑。再次感谢。
    猜你喜欢
    • 2012-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-08
    • 2021-07-26
    • 2011-08-10
    • 1970-01-01
    • 2019-06-03
    相关资源
    最近更新 更多