【问题标题】:optimising column-wise maximum with SIMD使用 SIMD 优化列最大值
【发布时间】:2020-12-05 01:17:36
【问题描述】:

我有这个函数,我在代码中花费了大量时间,如果可能的话,我想通过矢量化 SIMD 编译器内部函数对其进行优化。

它本质上是在列的矩阵上找到最大值的值和位置,并将它们存储起来:

  • val_ptr:输入矩阵:列优先(Fortran 风格)n_rows-by-n_cols(通常为 n_rows>>n_cols)
  • opt_pos_ptr:长度为 n_rows 的 int 向量,用于存储最大值的位置。输入时用零填充。
  • max_ptr:长度为 n_rows 的浮点向量,用于存储最大值。在条目中填充了 val_ptr 第一列的副本
  • 函数将在并行循环中调用
  • 保证内存区域不重叠
  • 我真的不需要填充max_ptr,目前它只是用于记账和避免内存分配
  • 我在 Windows 10 上使用 MSVC、C++17。旨在运行现代 Intel CPU

代码,其中模板类型是浮点数或双精度:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

到目前为止我尝试了什么:

  • 我尝试在内部循环中使用 OpenMP 并行 for,但只在非常大的行上带来了一些东西,比我目前的使用量大一些。
  • 内部循环中的 if 阻止 #pragma omp simd 工作,没有它我无法重写它。

【问题讨论】:

  • “行”和“列”的含义在这里出现了互换。在C语言中,我们通常将固定iA[i][j]集合称为行,将固定jA[i][j]集合称为列,C地址为&amp;A[i][j] = &A[0] [0] + i*NumberOfColumns + j, but val_ptr + col * n_rows` 是从该约定中交换的。
  • 我编辑了问题,以修复错误并添加更多详细信息。你是对的:我使用列优先布局,因为我使用 Armadillo 来管理我的矩阵,并且我通常会使用指针来完成繁重的工作,就像在这种情况下一样
  • 我认为模板类型会使内在函数的使用变得困难。如果 eT 是双倍,你可以摆脱 if 使用 fmax 来获得最大值和 ?: 用于 opt_pos_ptr。如果您必须使用模板类型,请从您的问题中删除 C 标记
  • 标签已删除。您能否指出您提到的功能的文档?模板不是必需的,我可以为 float 和 double 编写 2 个单独的实现,这是我的主要用法
  • 对于 'fmax c++' 的 fmax 搜索会产生命中,例如 en.cppreference.com/w/cpp/numeric/math/fmax 您可以替换 if ( cond) { opt[row] = col; } with opt[row] = cond ? col:选择[行]; fmax 和 ?: 看起来都可能涉及分支,但编译器可以使用条件存储代替。当然,这是否有任何改进,只有测量才能证明。

标签: c++ sse simd intrinsics avx


【解决方案1】:

根据您发布的代码示例,您似乎想要计算垂直最大值,这意味着在您的情况下“列”是水平的。在 C/C++ 中,元素的水平序列(即两个相邻元素在内存中的距离为一个元素)通常称为行和垂直(其中两个相邻元素在内存中的距离为行大小) - 列。在下面的回答中,我将使用传统术语,其中行是水平的,列是垂直的。

此外,为简洁起见,我将重点介绍一种可能的矩阵元素类型 - floatdouble 的基本思想是相同的,主要区别在于每个向量的元素数量和 _ps/_pd 内在函数选择。我会在最后提供double 的版本。


这个想法是您可以使用_mm_max_ps/_mm_max_pd 并行计算多列的垂直最大值。为了也记录找到的最大值的位置,您可以将先前的最大值与当前元素进行比较。比较的结果是一个掩码,其中的元素是更新最大值的全1。该掩码也可用于选择需要更新的位置。

我必须注意,如果一列中有多个相等的最大元素,则下面的算法假定记录最大元素的位置并不重要。另外,我假设矩阵不包含 NaN 值,这会影响比较。稍后会详细介绍。

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

由于混合内在函数,上面的代码需要 SSE4.1。您可以将这些替换为_mm_and_si128/_ps_mm_andnot_si128/_ps_mm_or_si128/_ps 的组合,在这种情况下,要求将降低到 SSE2。有关特定内在函数的更多详细信息,包括它们需要哪些指令集扩展,请参阅 Intel Intrinsics Guide


关于 NaN 值的说明。如果您的矩阵可以包含 NaN,_mm_cmplt_ps 测试将始终返回 false。至于_mm_max_ps,一般不知道会返回什么。如果任一操作数是 NaN,则内部转换为的 maxps 指令返回其第二个(源)操作数,因此通过排列指令的操作数,您可以实现任一行为。但是,没有记录 _mm_max_ps 内在函数的哪个参数代表指令的哪个操作数,甚至编译器可能在不同情况下使用不同的关联。更多详情见this答案。

为了确保正确的行为。 NaN,您可以使用内联汇编程序强制 maxps 操作数的正确顺序。不幸的是,这不是您所说的用于 x86-64 目标的 MSVC 的选项,因此您可以重用 _mm_cmplt_ps 结果进行第二次混合,如下所示:

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

这将抑制结果最大值中的 NaN。如果您想保留 NaN,则可以使用第二个比较来检测 NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

如果您使用更宽的向量(__m256__m512)并将外循环展开一小部分,您可能会进一步提高上述算法的性能,以便至少加载相当于行数据的缓存行在内循环的每次迭代中。


这是double 的实现示例。这里要注意的重要一点是,因为每个向量只有两个double元素,而且每个向量仍然有四个位置,所以我们必须展开外循环以一次处理double的两个向量,然后压缩两个掩码与之前的最大值进行比较,以混合 32 位位置。

void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

【讨论】:

  • _mm_cmplt_ps_mm_max_ps 而不是 eq 并行之后会有更多的 ILP,如果你小心使用它可能有助于解决 NaN 问题。此外,在大步前进时至少执行 1 个完整的向量缓存行是一个好主意,尤其是在数据按 64 或 128 对齐的情况下,因此您一次触摸每个缓存行,而不是跨遍分开。
  • @PeterCordes 谢谢,是的,_mm_cmplt_ps 确实更好。我已经更新了答案。
  • 我认为_mm_max_ps 应该尊重参数顺序,如 asm。但是 GCC7 之前的 GCC 即使没有 -ffast-math 也将其视为可交换的,因此实际上您只能在 clang、ICC 和 MSVC 上安全地使用它(我没有测试 MSVC)。见What is the instruction that gives branchless FP min and max on x86?。但是,是的,如果您想要移植到 GCC6 和更早版本,您需要假设编译器会将其视为可交换的,而不是依赖其 NaN 行为。
  • @PeterCordes 我在 SDM 或 Intel Intrinsics Guide 中没有找到操作数关联参数的任何指示,因此除非您想依赖特定编译器的(未记录的)行为,否则最好编写可移植代码.具有讽刺意味的是,在这种情况下,asm 块如何更便携。
  • @EnzoFerrazzano 我已经用double 的版本更新了答案。
猜你喜欢
  • 2019-05-29
  • 1970-01-01
  • 2022-01-11
  • 1970-01-01
  • 1970-01-01
  • 2015-10-24
  • 2012-11-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多