【问题标题】:How should I improve the performance of this C++ code?我应该如何提高这个 C++ 代码的性能?
【发布时间】:2013-10-15 04:01:00
【问题描述】:

以下代码对两个std::vectors v1v2 进行操作,每个都包含多个128 元素向量。通过外部向量的循环(使用i1i2)包含一个内部循环,旨在限制i1i2 的组合,对其执行进一步的复杂处理。过滤掉了大约 99.9% 的组合。

不幸的是,过滤循环是我程序中的主要瓶颈 - 分析表明,整个运行时间的 26% 都花在了 if(a[k] + b[k] > LIMIT) 行上。

const vector<vector<uint16_t>> & v1 = ...
const vector<vector<uint16_t>> & v2 = ...

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

        const vector<uint16_t> & a = v1[i1];
        const vector<uint16_t> & b = v2[i2];

        bool good = true;
        for(std::size_t k = 0; k < 128; ++k) {
            if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
                good = false;
                break;
            }
        }
        if(!good) continue;

        // Further processing involving i1 and i2
    }
}

我认为可以通过增加内存局部性以及向量化来提高这段代码的性能。关于如何做到这一点或其他可以改进的建议?

【问题讨论】:

  • 应该改用 codereview (他们应该已经把它放到 offtopic 关闭投票列表中)
  • 听起来像分支预测问题。 sse 会解决它
  • 数据结构可以简化吗?
  • 微优化:用goto代替good/continue。根据您的数据,您可以对向量进行排序,或者当单个元素本身 > 128 时排除完整的 i1/i2。#
  • 如果内部向量总是size = 128,则使用std::array,减少一个间接到达元素。

标签: c++ optimization x86 sse simd


【解决方案1】:

您可以将 SIMD 应用于内部循环:

    bool good = true;
    for(std::size_t k = 0; k < 128; ++k) {
        if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000
            good = false;
            break;
        }

如下:

#include <emmintrin.h>  // SSE2 intrinsics
#include <limits.h>     // SHRT_MIN

// ...

    // some useful constants - declare these somewhere before the outermost loop

    const __m128i vLIMIT = _mm_set1_epi16(LIMIT + SHRT_MIN); // signed version of LIMIT
    const __m128i vOFFSET = _mm_set1_epi16(SHRT_MIN);        // offset for uint16_t -> int16_t conversion

// ...

    bool good = true;
    for(std::size_t k = 0; k < 128; k += 8) {
        __m128i v, va, vb;              // iterate through a, b, 8 elements at a time
        int mask;
        va = _mm_loadu_si128(&a[k]);    // get 8 elements from a[k], b[k]
        vb = _mm_loadu_si128(&b[k]);
        v = _mm_add_epi16(va, vb);      // add a and b vectors
        v = _mm_add_epi16(v, vOFFSET);  // subtract 32768 to make signed
        v = _mm_cmpgt_epi16(v, vLIMIT); // compare against LIMIT
        mask = _mm_maskmove_epi8(v);    // get comparison results as 16 bit mask
        if (mask != 0) {                // if any value exceeded limit
            good = false;               // clear good flag and exit loop
            break;
        }

警告:未经测试的代码 - 可能需要调试,但一般方法应该是合理的。

【讨论】:

  • @BЈовић:是的,不幸的是,您对 std::vector 的对齐没有太多控制权。幸运的是,尽管在现代 CPU 上未对齐的负载并不太昂贵。
【解决方案2】:

对于v1,您已经获得了最有效的访问模式,但是对于外部循环的每次迭代,您都按顺序扫描了所有v2。这是非常低效的,因为v2 访问会不断导致(L2 可能还有 L3)缓存未命中。

更好的访问模式是增加循环嵌套,以便外部循环跨越v1v2,而内部循环处理v1v2 的子段中的元素,该子段足够小以适合在缓存中。

基本上,而不是

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000
    for(size_t i2 = 0; i2 < v2.size(); ++i2) {

for(size_t i2a = 0; i2a < v2.size(); i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < v2.size() && i2 < i2a + 32; ++i2) {

或者

size_t i2a = 0;

// handle complete blocks
for(; i2a < v2.size() - 31; i2a += 32) {
    for(size_t i1 = 0; i1 < v1.size(); ++i1) {
        for(size_t i2 = i2a; i2 < i2a + 32; ++i2) {

        }
    }
}

// handle leftover partial block
for(size_t i1 = 0; i1 < v1.size(); ++i1) {
    for(size_t i2 = i2a; i2 < v2.size(); ++i2) {
    }
}

这样,一大块32 * 128 * sizeof (uint16_t) 字节,即8kB,将从v2 加载到缓存中,然后重复使用20,000 次。

此改进与 SIMD (SSE) 矢量化正交。它将与基于线程的并行性交互,但可能是一种很好的方式。

【讨论】:

    【解决方案3】:

    首先,一个简单的优化可以是这样,但编译器可以自己完成,所以我不确定它可以改进多少:

    for(std::size_t k = 0; k < 128 && good; ++k)
    {
        good = a[k] + b[k] <= LIMIT;
    }
    

    其次,我认为最好将好的结果保留在第二个向量中,因为任何 与 i1 和 i2 相关的处理可能会破坏 CPU 缓存。

    第三,这可能是主要的优化,我认为您可以将第二个 for 循环重写为: for(size_t i2 = i1; i2

    堡垒,据我所知,您正在处理两个矩阵,最好保留元素向量而不是向量向量。

    希望这会有所帮助。 拉兹万。

    【讨论】:

      【解决方案4】:

      一些建议:

      • 按照 cmets 中的建议,将内部 128 元素向量替换为数组以获得更好的内存局部性。
      • 这段代码看起来高度可并行化,你试过吗?您可以拆分组合以在所有可用内核中进行过滤,然后重新平衡收集的工作并将处理拆分到所有内核中。

      我实现了一个版本,它使用数组作为内部 128 个元素,使用 PPL 进行并行化(需要 VS 2012 或更高版本)以及一些用于过滤的 SSE 代码,并获得了相当显着的加速。根据“进一步处理”所涉及的具体内容,稍微不同地构建事物可能会带来好处(例如,在此示例中,我不会在过滤后重新平衡工作)。

      更新:我实现了 Ben Voigt 建议的缓存阻塞,并获得了更多的加速。

      #include <vector>
      #include <array>
      #include <random>
      #include <limits>
      #include <cstdint>
      #include <iostream>
      #include <numeric>
      #include <chrono>
      #include <iterator>
      
      #include <ppl.h>
      
      #include <immintrin.h>
      
      using namespace std;
      using namespace concurrency;
      
      namespace {
      const int outerVecSize = 20000;
      const int innerVecSize = 128;
      const int LIMIT = 16000;
      auto engine = default_random_engine();
      };
      
      typedef vector<uint16_t> InnerVec;
      typedef array<uint16_t, innerVecSize> InnerArr;
      
      template <typename Cont> void randomFill(Cont& c) {
          // We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that
          static auto dist = normal_distribution<>(LIMIT / 4.0, LIMIT / 4.6);
          generate(begin(c), end(c), [] {
              auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); };
              return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max()));
          });
      }
      
      void resizeInner(InnerVec& v) { v.resize(innerVecSize); }
      void resizeInner(InnerArr& a) {}
      
      template <typename Inner> Inner generateRandomInner() {
          auto inner = Inner();
          resizeInner(inner);
          randomFill(inner);
          return inner;
      }
      
      template <typename Inner> vector<Inner> generateRandomInput() {
          auto outer = vector<Inner>(outerVecSize);
          generate(begin(outer), end(outer), generateRandomInner<Inner>);
          return outer;
      }
      
      void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size,
                  const int passedFilter, const uint32_t specialValue) {
          cout << passedFilter << "/" << in1Size* in2Size << " ("
               << 100.0 * (double(passedFilter) / double(in1Size * in2Size)) << "%) passed filter\n";
          cout << specialValue << "\n";
          cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl;
      }
      
      void TestOriginalVersion() {
          cout << __FUNCTION__ << endl;
      
          engine.seed();
          const auto v1 = generateRandomInput<InnerVec>();
          const auto v2 = generateRandomInput<InnerVec>();
      
          int passedFilter = 0;
          uint32_t specialValue = 0;
      
          auto startTime = chrono::high_resolution_clock::now();
      
          for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000
              for (size_t i2 = 0; i2 < v2.size(); ++i2) {
      
                  const vector<uint16_t>& a = v1[i1];
                  const vector<uint16_t>& b = v2[i2];
      
                  bool good = true;
                  for (std::size_t k = 0; k < 128; ++k) {
                      if (static_cast<int>(a[k]) + static_cast<int>(b[k])
                          > LIMIT) { // LIMIT is a const uint16_t: approx 16000
                          good = false;
                          break;
                      }
                  }
                  if (!good) continue;
      
                  // Further processing involving i1 and i2
                  ++passedFilter;
                  specialValue += inner_product(begin(a), end(a), begin(b), 0);
              }
          }
      
          auto endTime = chrono::high_resolution_clock::now();
      
          Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
      }
      
      bool needsProcessing(const InnerArr& a, const InnerArr& b) {
          static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes.");
          static const __m128i mmLimit = _mm_set1_epi16(LIMIT);
          static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1);
          static const __m128i mmOnes = _mm_set1_epi16(-1);
      
          auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); };
          return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) {
              // avoid overflow due to signed compare by clamping sum to LIMIT + 1
              const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1);
              return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes);
          });
      }
      
      void TestArrayParallelVersion() {
          cout << __FUNCTION__ << endl;
      
          engine.seed();
          const auto v1 = generateRandomInput<InnerArr>();
          const auto v2 = generateRandomInput<InnerArr>();
      
          combinable<int> passedFilterCombinable;
          combinable<uint32_t> specialValueCombinable;
      
          auto startTime = chrono::high_resolution_clock::now();
      
          const size_t blockSize = 64;
      
          parallel_for(0u, v1.size(), blockSize, [&](size_t i) {
              for (const auto& b : v2) {
                  const auto blockBegin = begin(v1) + i;
                  const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize);
                  for (auto it = blockBegin; it != blockEnd; ++it) {
                      const InnerArr& a = *it;
                      if (!needsProcessing(a, b))
                          continue;
      
                      // Further processing involving a and b
                      ++passedFilterCombinable.local();
                      specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0);
                  }
              }
          });
      
          auto passedFilter = passedFilterCombinable.combine(plus<int>());
          auto specialValue = specialValueCombinable.combine(plus<uint32_t>());
      
          auto endTime = chrono::high_resolution_clock::now();
      
          Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue);
      }
      
      int main() {
          TestOriginalVersion();
          TestArrayParallelVersion();
      }
      

      在我的 8 核系统上,我看到了相当不错的加速,你的结果会根据你拥有的核心数量等而有所不同。

      TestOriginalVersion
      441579/400000000 (0.110395%) passed filter
      2447300015
      Elapsed time = 12525ms
      TestArrayParallelVersion
      441579/400000000 (0.110395%) passed filter
      2447300015
      Elapsed time = 657ms
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-07-20
        • 2019-11-19
        • 1970-01-01
        • 1970-01-01
        • 2010-10-07
        • 1970-01-01
        • 2016-04-18
        相关资源
        最近更新 更多