【问题标题】:Efficiently calculate mean in a large C++ array有效计算大型 C++ 数组中的平均值
【发布时间】:2021-06-06 14:50:57
【问题描述】:

我有一个包含 7 个块的 65K 样本的数据集:float arr[7][65536] 我需要计算每个相应 7 个数字的平均值,因此结果将是一个大小为 65536 的数组:

float result[0] = arr[0][0] + arr[1][0] + arr[2][0] + ... / 7
float result[1] = arr[0][1] + arr[1][1] + arr[2][1] + ... / 7

问题是不按顺序访问内存会造成很多缓存未命中,在内存方面有没有更好的方法来解决这个问题? 提前对数组进行整形也会导致内存效率低下。

谢谢。

【问题讨论】:

  • 发布创建这个二维数组的实际代码。我们不知道您是否以天真的方式动态创建了这个数组,数组元素散布在整个堆中,它是否是数据位于连续内存中的“真正”二维数组,等等。
  • 也许,先for (size_t j = 0; j < 65536; ++j) result[j] = array[0][j];,然后for (size_t i = 1; i < 7; ++i) for (size_t j = 0; j < 65536; ++j) result[j] += array[i][j];;然后for (size_t j = 0; j < 65536; ++j) result[j] /= 7;
  • @PaulMcKenzie - 这是一个真正的二维数组,在内存中是连续的。

标签: c++ arrays performance caching


【解决方案1】:

一个简单的解决方案是遍历连续维度。但是,如果sizeof(arr[0][0]) 很大,这可能不是最佳的,因为result 不适合L1 缓存。最佳解决方案可能是使用阻塞来解决这个问题。

这是一个 C++ 示例代码:

// Blocked reduction using blocks of size 1024
// This loop iterate over the blocks
for(size_t j=0 ; j<65536 ; j+=1024)
{
    for(size_t k=j ; k<j+1024 ; ++k)
        result[k] = arr[0][k];

    // Summation of the current block
    for(size_t i=1 ; i<7 ; ++i)
        for(size_t k=j ; k<j+1024 ; ++k)
            result[k] += arr[i][k];

    for(size_t k=j ; k<j+1024 ; ++k)
        result[k] /= 7;
}

请注意,result 的类型必须足够大,以容纳比 arr[0][0] 的类型小/大 7 倍的值。这个循环应该由你的编译器向量化,并且应该产生更少的缓存未命中,从而产生更快的代码。

PS:如果循环未矢量化,您可以通过在基于 k 的内部循环之前添加 OpenMP 指令 #pragma omp simd 来帮助编译器(并确保启用 OpenMP)。

【讨论】:

  • 根据quick-bench,在clang 中使用int 时,这比我的幼稚解决方案快约1.1 倍。使用gcc,您的版本是~1.2 times faster
【解决方案2】:

希望在内部数组周围保持内部循环将充分利用缓存线(因此比在外部数组周围进行内部循环更快),您可以试试这个:

for(const auto& inner_arr : arr) {
    for(size_t i = 0; i < std::size(inner_arr); ++i) {
        result[i] += inner_arr[i];
    }
}
for(auto& v : result) v /= std::size(arr); // div down to mean value

但你必须衡量它是否真的比你现在拥有的更快/更有效。

另一种方法可能是使用 C++17 中添加的执行策略。我添加了一个 counting iterator 来让它工作。如果你有boost,你可以使用他们的counting iterator

struct counting_iterator {
    using iterator_category = std::forward_iterator_tag;
    using value_type = size_t;
    using pointer = value_type*;
    using referece = value_type&;
    using difference_type = std::intmax_t;

    counting_iterator& operator++() { ++value; return *this; }
    counting_iterator operator++(int) { auto copy=*this; ++value; return copy; }
    bool operator==(const counting_iterator& rhs) const { return value == rhs.value; }
    bool operator!=(const counting_iterator& rhs) const { return value != rhs.value; }

    value_type operator*() const { return value; }

    size_t value;
}

// ...

std::for_each(std::execution::par_unseq, counting_iterator{0}, counting_iterator{65536},
    [&arr, &result](size_t i) {
        for(size_t o = 0; o < 7; ++o)
            result[i] += arr[o][i];
        result[i] /= 7;
    });

当使用 gcc 和 int 作为数据类型时,这比我上面根据 quick-bench~2.1 times faster 与 clang 的简单解决方案快约 1.4 倍。这可能会有很大差异,具体取决于您有多少硬件线程可用。

【讨论】:

    猜你喜欢
    • 2022-11-12
    • 2019-04-08
    • 2014-01-23
    • 2016-10-24
    • 1970-01-01
    • 1970-01-01
    • 2013-10-25
    • 1970-01-01
    • 2014-05-14
    相关资源
    最近更新 更多