【问题标题】:Why does this Cilk Array Notation matrix perform nearly twice as slow as the serial version?为什么这个 Cilk Array Notation 矩阵的执行速度几乎是串行版本的两倍?
【发布时间】:2015-03-19 16:35:40
【问题描述】:

我正在对串行和 Cilk 数组表示法版本的矩阵乘法性能进行基准测试。 Cilk 实现所需的时间几乎是串行的两倍,我不明白为什么。

这是单核执行

这是 Cilk 乘法的精髓。由于排名限制,我必须在设置目标矩阵元素值之前将每个乘法存储在一个数组中,然后 __sec_reduce_add 这个数组。

int* sum = new int[VEC_SIZE];
for (int r = 0; (r < rowsThis); r++) {
    for (int c = 0; (c < colsOther); c++) {
        sum[0:VEC_SIZE] = myData[r][0:VEC_SIZE] * otherData[0:VEC_SIZE][c]; 
        product.data[r][c] = __sec_reduce_add(sum[0:VEC_SIZE]);
    }
}

我了解缓存问题,并且认为 Cilk 版本的缓存命中次数少于串行版本的任何原因,因为它们都访问希望在缓存中的列数组以及一系列未命中的行元素。

是否存在我忽略的明显依赖项或我应该使用的 Cilk 句法元素?我应该以不同的方式使用 Cilk 来实现使用 SIMD 操作的非块矩阵乘法的最大性能吗?

我是 Cilk 的新手,因此欢迎任何帮助/建议。

编辑:
这是串行实现:

for (int row = 0; (row < rowsThis); row++) {
    for (int col = 0; (col < colsOther); col++) {
        int sum = 0;
        for (int i = 0; (i < VEC_SIZE); i++)  {
            sum += (matrix1[row][i] * matrix2[i][col]);
        }
        matrix3[row][col] = sum;
    }
}

内存被适当地(取消)分配,并且乘法结果对于两种实现都是正确的。矩阵大小在编译时是未知的,并且在整个基准测试中使用了多种矩阵大小。

编译器:icpc (ICC) 15.0.0 20140723
编译标志:icpc -g Wall -O2 -std=c++11

忽略分配内存的使用、从向量到普通数组的转换等。我凭直觉破解了另一个程序来运行它,假设它比事实证明的要简单。我无法让编译器在二维向量的两个维度上都接受 cilk 表示法,所以我决定使用传统数组来进行基准测试。

以下是所有适当的文件:
矩阵测试.cpp

#include <string>
#include <fstream>
#include <stdlib.h>
#include "Matrix.h"


#define MATRIX_SIZE 2000
#define REPETITIONS 1

int main(int argc, char** argv) {

    auto init = [](int row, int col) { return row + col; };

    const Matrix matrix1(MATRIX_SIZE, MATRIX_SIZE, init);
    const Matrix matrix2(MATRIX_SIZE, MATRIX_SIZE, init);
    for (size_t i = 0; (i < REPETITIONS); i++) {
        const Matrix matrix3 = matrix1 * matrix2;
        std::cout << "Diag sum: " << matrix3.sumDiag() << std::endl;
    }    
    return 0;
}

矩阵.h

#ifndef MATRIX_H
#define MATRIX_H

#include <iostream>
#include <functional>
#include <vector>

using TwoDVec = std::vector<std::vector<int>>;

class Matrix {                                                                                                                                                                     
public:                                                                                                                                                                            
    Matrix();                                                                                                                                                                      
    ~Matrix();                                                                                                                                                                     
    Matrix(int rows, int cols);                                                                                                                                                    
    Matrix(int rows, int cols, std::function<Val(int, int)> init);                                                                                                                 
    Matrix(const Matrix& src);                                                                                                                                                     
    Matrix(Matrix&& src);                                                                                                                                                          
    Matrix operator*(const Matrix& src) const throw(std::exception);                                                                                                               
    Matrix& operator=(const Matrix& src);                                                                                                                                          
    int sumDiag() const;                                                                                                                                                           

 protected:                                                                                                                                                                        
    int** getRawData() const;                                                                                                                                                      
 private:                                                                                                                                                                          
    TwoDVec data;                                                                                                                                                                  
};

#endif

矩阵.cpp

#include <iostream>
#include <algorithm>
#include <stdexcept>
#include "Matrix.h"

#if defined(CILK)

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    int** thisRaw = this->getRawData();  // held until d'tor
    int** otherRaw = other.getRawData();

    Matrix product(rowsThis, colsOther);

    const int VEC_SIZE = colsThis;

    for (int r = 0; (r < rowsThis); r++) {
        for (int c = 0; (c < colsOther); c++) {
            product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE]
                                             * otherRaw[0:VEC_SIZE][c]);
        }
    }
    delete[] thisRaw;
    delete[] otherRaw;

    return product;
}

#elif defined(SERIAL)

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    int** thisRaw = this->getRawData();  // held until d'tor
    int** otherRaw = other.getRawData();

    Matrix product(rowsThis, colsOther);

    const int VEC_SIZE = colsThis;

    for (int r = 0; (r < rowsThis); r++) {
        for (int c = 0; (c < colsOther); c++) {
            int sum = 0;
            for (int i = 0; (i < VEC_SIZE); i++) {
                sum += (thisRaw[r][i] * otherRaw[i][c]);
            }
            product.data[r][c] = sum;
        }
    }
    delete[] thisRaw;
    delete[] otherRaw;

    return product;
}

#endif

//  Default c'tor
Matrix::Matrix()
    : Matrix(1,1) { }

Matrix::~Matrix() { }

// Rows/Cols c'tor
Matrix::Matrix(int rows, int cols)
    : data(TwoDVec(rows, std::vector<int>(cols, 0))) { }

//  Init func c'tor
Matrix::Matrix(int rows, int cols, std::function<int(int, int)> init)
    : Matrix(rows, cols) {
    for (int r = 0; (r < rows); r++) {
        for (int c = 0; (c < cols); c++) {
            data[r][c] = init(r,c);
        }
    }
}

//  Copy c'tor
Matrix::Matrix(const Matrix& other) {
    int rows = other.data.size();
    int cols = rows > 0 ? other.data[0].size() : 0;
    this->data.resize(rows, std::vector<int>(cols, 0));
    for(int r = 0; (r < rows); r++) {
        this->data[r] = other.data[r];
    }
}


//  Move c'tor
Matrix::Matrix(Matrix&& other) {
    if (this == &other) return;
    this->data.clear();
    int rows = other.data.size();
    for (int r = 0; (r < rows); r++) {
        this->data[r] = std::move(other.data[r]);
    }
}


Matrix&
Matrix::operator=(const Matrix& other) {
    int rows = other.data.size();
    this->data.resize(rows, std::vector<int>(0));
    for (int r = 0; (r < rows); r++) {
        this->data[r] = other.data[r];
    }
    return *this;
}

int
Matrix::sumDiag() const {
    int rows = data.size();
    int cols = rows > 0 ? data[0].size() : 0;

    int dim = (rows < cols ? rows : cols);
    int sum = 0;
    for (int i = 0; (i < dim); i++) {
        sum += data[i][i];
    }
    return sum;
}


int**
Matrix::getRawData() const {
    int** rawData = new int*[data.size()];
    for (int i = 0; (i < data.size()); i++) {
        rawData[i] = const_cast<int*>(data[i].data());
    }
    return rawData;
}

【问题讨论】:

  • VEC_SIZE 有多大,您使用的是什么编译器?序列号是什么样的?
  • rowsThis 和 colsOther 有多大? VEC_SIZE 是常数吗?你还记得释放使用“new int[VEC_SIZE]”创建的临时数组吗?
  • 我已经进行了适当的修改。矩阵大小的编译时感知是否为这些 Cilk 操作提供了改进的优化机会?
  • 大小可能有缓存效果。例如,如果 VEC_SIZE 很大,则临时数组总和可能会溢出 L1 缓存。
  • 另一个问题:是否涉及C99变长数组? IE。每个矩阵的最后一维是编译时常数吗?

标签: arrays caching vectorization matrix-multiplication cilk-plus


【解决方案1】:

[2015-3-30 更新以匹配长代码示例。]

icc 可能会自动矢量化您的累积循环,因此 Cilk Plus 正在与矢量化代码竞争。这里有两个可能的性能问题:

  1. 临时数组总和增加了加载和存储的数量。串行代码仅执行两次(SIMD)加载,并且每次(SIMD)乘法几乎没有存储。使用临时数组,每次乘法有 3 次加载和 1 次存储。

  2. matrix2 具有非单元跨度访问模式(在串行代码中也是如此)。典型的当前硬件在单位步长访问中工作得更好。

要解决问题 (1),请消除临时数组,就像您稍后在更长的样本中所做的那样。例如:

for (int r = 0; (r < rowsThis); r++) {
    for (int c = 0; (c < colsOther); c++) {
        product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE] * otherRaw[0:VEC_SIZE][c]);
    }
}

__sec_reduce_add 结果的秩为零,因此可以分配给标量。

更好的是,解决步幅问题。除非结果矩阵非常宽,否则一个好的方法是逐行累积结果,如下所示:

for (int r = 0; (r < rowsThis); r++) {
    product.data[r].data()[0:colsOther] = 0;
    for (int k = 0; (k < VEC_SIZE); k++) {
        product.data[r].data()[0:colsOther] +=thisRaw[r][k] * otherRaw[k][0:colsOther];
    }
}

注意这里使用data()。数组表示法目前不允许部分表示法与来自std::vector 的重载[] 一起使用。所以我使用data()来获取一个指向底层数据的指针,我可以将它与数组表示法一起使用。

现在数组部分都有单位步长,因此编译器可以有效地使用向量硬件。上面的方案通常是我最喜欢的构建无阻塞矩阵乘法的方法。当使用icpc -g -Wall -O2 -xHost -std=c++11 -DCILK 编译并在 i7-4770 处理器上运行时,我看到 MatrixTest 程序时间从大约 52 秒下降到 1.75 秒,性能提升了 29 倍。

通过消除归零(在构造product 时已经完成)和消除临时指针数组的构造,可以简化和加快代码速度。以下是修改后的 operator* 的完整定义:

Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
    int rowsOther = other.data.size();
    int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
    int rowsThis = data.size();
    int colsThis = rowsThis > 0 ? data[0].size() : 0;
    if (colsThis != rowsOther) {
        throw std::runtime_error("Invalid matrices for multiplication.");
    }

    Matrix product(rowsThis, colsOther);

    for (int r = 0; (r < rowsThis); r++) {
        product.data[r].data()[0:colsOther] = 0;
        for (int k = 0; (k < colsThis); k++) {
            product.data[r].data()[0:colsOther] += (*this).data[r][k] * other.data[k].data()[0:colsOther];
        }
    }

    return product;
}

如果Matrix 有计算data[i].data() 的方法,那么长线会更短。

【讨论】:

  • 好的,我尝试了第一个示例,但性能仍然慢得多。我已经确认了基准测试的完整性并提供了矩阵大小的编译时间常数。我对你的第二个例子感到困惑。
  • 发布完整的基准测试对您来说实用吗?您使用的是什么编译器(及其版本)?
  • 我剪掉了不相关的方法,添加了上面的文件。
  • Matrix.h 引用了一个未定义的“Val.h”。 icc 提示:如果您在与编译相同的平台上运行,请在命令行中添加“-xHost”。这样,如果您的平台有 AVX,您就可以获得 AVX,而无需任何 CPUID 检查开销。
  • 我无法复制减速。这是在 Haswell 处理器上运行: $ make icpc -g -Wall -O2 -xHost -std=c++11 -DSERIAL Matrix.cpp MatrixTest.cpp -o s.x icpc -g -Wall -O2 -xHost -std=c ++11 -DCILK Matrix.cpp MatrixTest.cpp -o c.x time s.x Diag sum: 1922000768 52.56user 0.01system 0:52.56elapsed 100%CPU (0avgtext+0avgdata 48356maxresident)k 0inputs+0outputs (0major+12176minor)xpagefaults 0swaps time.诊断总和:1922000768 52.41user 0.00system 0:52.40elapsed 100%CPU (0avgtext+0avgdata 48360maxresident)k 0inputs+0outputs (0major+12178minor)pagefaults 0swaps
猜你喜欢
  • 2020-07-19
  • 2018-03-17
  • 2021-10-11
  • 1970-01-01
  • 1970-01-01
  • 2017-11-04
  • 1970-01-01
  • 1970-01-01
  • 2020-04-27
相关资源
最近更新 更多