【问题标题】:Why does loop unrolling have no effect for a large dataset?为什么循环展开对大型数据集没有影响?
【发布时间】:2014-11-28 16:23:28
【问题描述】:

我想对应用在 triangle 对象上的展开循环和 for 循环之间的执行速度差异进行基准测试。整个示例可用here

完整代码如下:

#include <iostream>
#include <vector>
#include <array>
#include <random>
#include <functional>
#include <chrono>
#include <fstream>

template<typename RT>
class Point 
{
    std::array<RT,3> data; 

    public: 

        Point() = default;

        Point(std::initializer_list<RT>& ilist)
            :
                data(ilist)
        {}

        Point(RT x, RT y, RT z)
            :
                data({x,y,z})
        {};

        RT& operator[](int i)
        {
            return data[i];  
        }

        RT operator[](int i) const
        {
            return data[i];
        }

        const Point& operator += (Point const& other)
        {
            data[0] += other.data[0];
            data[1] += other.data[1];
            data[2] += other.data[2];

            return *this; 
        }

        const Point& operator /= (RT const& s)
        {
            data[0] /= s; 
            data[1] /= s;  
            data[2] /= s;  

            return *this;
        }

};

template<typename RT>
Point<RT> operator-(const Point<RT>& p1, const Point<RT>& p2)
{
    return Point<RT>(p1[0] - p2[0], p1[1] - p2[1], p1[2] - p2[2]);
}

template<typename RT>
std::ostream& operator<<(std::ostream& os , Point<RT> const& p)
{
    os << p[0] << " " << p[1] << " " << p[2]; 
    return os;
}

template<typename Point>
class Triangle 
{
    std::array<Point, 3> points; 

    public: 

        typedef typename std::array<Point, 3>::value_type value_type;

        typedef Point PointType; 

        Triangle() = default; 

        Triangle(std::initializer_list<Point>& ilist) 
            :
                points(ilist)
        {}

        Triangle(Point const& p1, Point const& p2, Point const& p3)
            :
                points({p1, p2, p3})
        {}

        Point& operator[](int i)
        {
            return points[i]; 
        }

        Point operator[](int i) const
        {
            return points[i]; 
        }

        auto begin()
        {
            return points.begin(); 
        }

        const auto begin() const
        {
            return points.begin(); 
        }

        auto end()
        {
            return points.end(); 
        }

        const auto end() const
        {
            return points.end(); 
        }
};

template<typename Triangle>
typename Triangle::PointType barycenter_for(Triangle const& triangle)
{
    typename Triangle::value_type barycenter; 

    for (const auto& point : triangle)
    {
        barycenter += point; 
    }

    barycenter /= 3.; 

    return barycenter; 
}

template<typename Triangle>
typename Triangle::PointType barycenter_unrolled(Triangle const& triangle)
{
    typename Triangle::PointType barycenter; 

    barycenter += triangle[0]; 
    barycenter += triangle[1]; 
    barycenter += triangle[2]; 

    barycenter /= 3.; 

    return barycenter; 
}

template<typename TriangleSequence>
typename TriangleSequence::value_type::value_type
barycenter(
    TriangleSequence const& triangles, 
    std::function
    <
        typename TriangleSequence::value_type::value_type (
            typename TriangleSequence::value_type const &
         )
    > triangle_barycenter 
)
{
    typename TriangleSequence::value_type::value_type barycenter; 

    for(const auto& triangle : triangles)
    {
        barycenter += triangle_barycenter(triangle); 
    }

    barycenter /= double(triangles.size()); 

    return barycenter; 
}

using namespace std;

int main(int argc, const char *argv[])
{
    typedef Point<double> point; 
    typedef Triangle<point> triangle; 

    const int EXP = (atoi(argv[1]));

    ofstream outFile; 
    outFile.open("results.dat",std::ios_base::app); 

    const unsigned int MAX_TRIANGLES = pow(10, EXP);

    typedef std::vector<triangle> triangleVector; 

    triangleVector triangles;

    std::random_device rd;
    std::default_random_engine e(rd());
    std::uniform_real_distribution<double> dist(-10,10); 

    for (unsigned int i = 0; i < MAX_TRIANGLES; ++i)
    {
        triangles.push_back(
            triangle(
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e)),
                point(dist(e), dist(e), dist(e))
            )
        );
    }

    typedef std::chrono::high_resolution_clock Clock; 

    auto start = Clock::now();
    auto trianglesBarycenter = barycenter(triangles, [](const triangle& tri){return barycenter_for(tri);});
    auto end = Clock::now(); 

    auto forLoop = end - start; 

    start = Clock::now();
    auto trianglesBarycenterUnrolled = barycenter(triangles, [](const triangle& tri){return barycenter_unrolled(tri);});
    end = Clock::now(); 

    auto unrolledLoop = end - start; 

    cout << "Barycenter difference (should be a zero vector): " << trianglesBarycenter - trianglesBarycenterUnrolled << endl;

    outFile << MAX_TRIANGLES << " " << forLoop.count() << " " << unrolledLoop.count() << "\n"; 

    outFile.close();

    return 0;
}

该示例由 Point 类型和 Triangle 类型组成。基准计算是三角形重心的计算。可以通过 for 循环来完成:

for (const auto& point : triangle)
{
    barycenter += point; 
}

barycenter /= 3.; 

return barycenter; 

或者它可以展开,因为三角形有三个点:

barycenter += triangle[0]; 
barycenter += triangle[1]; 
barycenter += triangle[2]; 

barycenter /= 3.; 

return barycenter; 

所以我想测试对于一组三角形来说,计算重心的函数会更快。为了充分利用测试,我通过执行带有整数指数参数的主程序来使正在操作的三角形的数量可变:

./main 6

产生 10^6 个三角形。三角形的数量范围从 100 到 1e06。主程序创建“results.dat”文件。为了分析结果,我编写了一个小的 Python 脚本:

#!/usr/bin/python

from matplotlib import pyplot as plt
import numpy as np
import os

results = np.loadtxt("results.dat")

fig = plt.figure()

ax1 = fig.add_subplot(111)
ax2 = ax1.twinx()

ax1.loglog(); 
ax2.loglog();

ratio = results[:,1] / results[:,2]

print("Speedup factors unrolled loop / for loop: ")
print(ratio)

l1 = ax1.plot(results[:,0], results[:,1], label="for loop", color='red')
l2 = ax1.plot(results[:,0], results[:,2], label="unrolled loop", color='black')
l3 = ax2.plot(results[:,0], ratio, label="speedup ratio", color='green')

lines  = [l1, l2, l3]; 

ax1.set_ylabel("CPU count")
ax2.set_ylabel("relative speedup: unrolled loop / for loop")

ax1.legend(loc="center right")
ax2.legend(loc="center left")

plt.savefig("results.png")

要利用您计算机上的所有内容,请复制示例代码,并使用以下命令进行编译:

g++ -std=c++1y -O3 -Wall -pedantic -pthread main.cpp -o main

要绘制不同重心函数的测量 CPU 时间,请执行 python 脚本(我称之为plotResults.py):

 for i in {1..6}; do ./main $i; done
./plotResults.py

我期望看到的是展开循环和for 循环之间的相对加速(循环时间/展开循环时间)将随着三角形集的大小而增加。这个结论可以从一个逻辑得出:如果展开的循环比 for 循环快,则执行大量展开的循环应该比执行大量的 for 循环更快。以下是上述 python 脚本生成的结果图:

循环展开的影响很快就消失了。一旦我使用超过 100 个三角形,似乎没有区别。查看 python 脚本计算的加速比:

[ 3.13502399  2.40828402  1.15045831  1.0197221   1.1042312   1.26175165
  0.99736715]

使用 100 个三角形(列表中的 3d 位置对应于 10^2)时的加速比为 1.15。

我来这里是为了找出我做错了什么,因为这里一定有什么问题,恕我直言。 :) 提前致谢。

编辑:绘制 cachegrind 缓存未命中率

如果程序是这样运行的:

for input in {2..6}; do valgrind --tool=cachegrind  ./main $input; done

cachegrind生成一堆输出文件,可以用grep解析为PROGRAM TOTALS,代表以下数据的数字列表,取自cachegrind manual

Cachegrind 收集以下统计信息(用于 每个统计数据都在括号中给出):

I cache reads (Ir, which equals the number of instructions executed), I1 cache read misses (I1mr) and LL cache instruction read

未命中 (ILmr)。

D cache reads (Dr, which equals the number of memory reads), D1 cache read misses (D1mr), and LL cache data read misses (DLmr).

D cache writes (Dw, which equals the number of memory writes), D1 cache write misses (D1mw), and LL cache data write misses (DLmw).

Conditional branches executed (Bc) and conditional branches mispredicted (Bcm).

Indirect branches executed (Bi) and indirect branches mispredicted (Bim).

而“组合”缓存未命中率定义为:(ILmr + DLmr + DLmw) / (Ir + Dr + Dw)

输出文件可以这样解析:

for file in cache*; do cg_annotate $file | grep TOTALS >> program-totals.dat; done
sed -i 's/PROGRAM TOTALS//'g program-totals.dat

然后可以使用这个 python 脚本来可视化生成的数据:

#!/usr/bin/python
from matplotlib import pyplot as plt
import numpy as np
import os
import locale

totalInput = [totalInput.strip().split(' ') for totalInput in open('program-totals.dat','r')]

locale.setlocale(locale.LC_ALL, 'en_US.UTF-8' ) 

totals = []

for line in totalInput:
    totals.append([locale.atoi(item) for item in line])

totals = np.array(totals)

# Assumed default output format
# Ir I1mr  ILmr Dr Dmr DLmr Dw D1mw DLmw
# 0   1     2   3   4   5   6   7    8
cacheMissRatios = (totals[:,2] + totals[:,5] + totals[:,8]) / (totals[:,0] + totals[:,3] + totals[:,6])

fig = plt.figure()
ax1 = fig.add_subplot(111)
ax1.loglog()

results = np.loadtxt("results.dat")
l1 = ax1.plot(results[:,0], cacheMissRatios, label="Cachegrind combined cache miss ratio", color='black', marker='x')
l1 = ax1.plot(results[:,0], results[:,1] / results[:,2], label="Execution speedup", color='blue', marker='o')

ax1.set_ylabel("Cachegrind combined cache miss ratio")
ax1.set_xlabel("Number of triangles")
ax1.legend(loc="center left")

plt.savefig("cacheMisses.png")

因此,当三角形访问循环展开时,绘制组合的 LL 未命中率与程序加速比,得到下图:

LL 错误率似乎存在依赖性:随着它的上升,程序的加速下降。但是,我仍然看不到瓶颈的明确原因。

综合 LL 未命中率是否适合分析?查看 valgrind 的输出,据报道所有未命中率都低于 5%,这应该还不错吧?

【问题讨论】:

  • 根据您运行代码的位置,计时仅准确到 15 毫秒,也就是永恒。检查代码,但编译器应该能够自行优化。
  • 如果编译器自己优化,为什么小三角形的执行时间会有很大差异?我希望对一小组三角形执行循环展开,而不是针对 1e06 个三角形...
  • 你能发布你完整的c代码吗?仅从描述中很难知道您是如何测试的。例如你如何测量时间。
  • 你也忘了传递-DNDEBUG。可能会影响迭代器速度。
  • @doc:我使用了另一个程序并从 argv[1] 初始化了指数。我正在使用 valgrind 检查 LL 缓存中的缓存未命中,以查看那里是否存在问题。正如预期的那样,大型数据集的缓存未命中率变为 0% - 这是一个巨大的向量。

标签: c++ optimization loop-unrolling


【解决方案1】:

即使展开,barycenter 的计算也是一次完成的。此外,计算的每一步(对于单个重心)都依赖于前一步,这意味着它们不能并行化。您当然可以通过一次计算 n 重心而不是只计算一个重心来获得更好的吞吐量,并对 n 的各种值进行基准测试以确定哪些数量会使 CPU 管道饱和。

另一个可能有助于加快计算速度的方面是 数据布局:您可以尝试将三角形点拆分为 3 个不同的数组(每个点一个),而不是将三角形点一起存储在一个结构中,并再次对n 使用不同的值进行基准测试。

关于您的主要问题,除非代码转换降低了底层算法的复杂程度(这是完全可能的),否则获得的速度最多应该与数据集大小成线性关系,但是如果数据集足够大,它可能会达到不同的限制(例如,当一级内存 - 缓存级别 1、级别 2、主内存 - 饱和时会发生什么?)。

【讨论】:

  • 谢谢,我会试试你的建议。关于第二段:我认为我无法转换三角形的数据布局,因为其他算法依赖于它们被打包在一起。使用间接寻址会增加操作三角形集合的算法的复杂性。
  • 确实,第二次转型更雄心勃勃。顺便说一句,很高兴看到一个写得很好的 C++ 代码的问题,谢谢你的提问。
  • 非常感谢您的帮助 + 对 C++ 代码的积极评价。 :)
【解决方案2】:
  1. 对于小型数据集,您的第二个 BarycenterUnrolled 循环更快,因为数据集足够小,可以进行 L2/L3 缓存优化。尝试交换您在程序中运行测试的顺序。一个看似合乎逻辑的决定可能是将测试作为单独的进程运行,但这也并不总是有效:L2/L3 缓存是持久的。每个过程的后续运行可能会产生不同的结果。 (详情见下文)

  2. 您在频谱上观察到的其余差异是噪声。在这两种情况下,您的 GCC 编译器都会生成几乎相同的代码。当指定 -O3 时,GCC 以积极展开循环(例如那个循环)而闻名。事实上,GCC 在某些情况下会展开循环多达 16 或 24 次迭代——这有时会损害某些移动芯片架构的性能。

此外,您可以使用 -fno-unroll-loops 进行测试...虽然我怀疑您会看到很多差异,因为您的算法的主要瓶颈按以下顺序排列:

  1. 除法运算 (/= 3)
  2. 内存

关于在短数据集上运行适当的基准测试:

为了避免对短数据集产生 L2/L3 缓存噪音,您需要在每次基准测试之前刷新缓存。这通常是通过在 ~16MB - 32MB 的堆中分配一些大块数据并对其进行读/写垃圾来完成的。在您的情况下,为每个测试构建完全不同的三角形列表也是可取的。

但最好的建议通常是:“不要在小型数据集上运行基准测试”。相反,只在非常大的数据集上运行基准测试,然后在大数据集上也使用最好的用于小数据集。这适用于微优化情况,例如循环展开或 cpu 指令计数。如果您正在执行更高级别的算法,例如排序或树遍历,并且知道您的主要用例将是小型数据集,那么应该使用一组不同的基准标准。在这些情况下,我更喜欢通过连接数十个小数据集来创建一个“大”数据集。这将强调算法中可能成为小型数据集瓶颈的部分,例如设置和结果处理。

【讨论】:

    【解决方案3】:

    循环展开可以节省循环的开销。如果执行循环所需的时间比执行循环的每个单次迭代的时间小,那么您将不会节省太多。

    情况可能更糟。您的处理器有许多独立工作的单元。例如,您可能有一个内存单元、一个浮点单元和一个整数单元。您的代码将花费这些单元中最慢的时间。循环(递增索引,检查它是否足够小,从循环的开头开始)由整数单元执行。如果您的代码在内存单元中需要 100 毫秒,在浮点单元中需要 80 毫秒,在整数单元中需要 60 毫秒,那么它需要 100 毫秒。浮点或整数单位的任何节省都不会让它更快。

    请注意,对于小示例,所有数据都适合缓存。因此内存单元将花费相对较少的时间。假设您有一个小样本,没有展开需要 60µs(内存)、60µs(浮点)和 80µs(整数)。现在循环展开可以帮助将总时间从 80µs 减少到 60µs。

    【讨论】:

      猜你喜欢
      • 2012-06-15
      • 2018-06-04
      • 2019-07-12
      • 1970-01-01
      • 2013-01-18
      • 2019-09-03
      • 1970-01-01
      • 1970-01-01
      • 2021-03-07
      相关资源
      最近更新 更多