【发布时间】:2016-02-24 14:21:40
【问题描述】:
我需要尽可能高效地计算乘积向量矩阵。具体来说,给定一个向量s 和一个矩阵A,我需要计算s * A。我有一个类 Vector 包装了一个 std::vector 和一个类 Matrix 也包装了一个 std::vector (为了提高效率)。
天真的方法(我目前正在使用的方法)是有类似的东西
Vector<T> timesMatrix(Matrix<T>& matrix)
{
Vector<unsigned int> result(matrix.columns());
// constructor that does a resize on the underlying std::vector
for(unsigned int i = 0 ; i < vector.size() ; ++i)
{
for(unsigned int j = 0 ; j < matrix.columns() ; ++j)
{
result[j] += (vector[i] * matrix.getElementAt(i, j));
// getElementAt accesses the appropriate entry
// of the underlying std::vector
}
}
return result;
}
它运行良好,耗时近 12000 微秒。注意向量s有499个元素,而A是499 x 15500。
下一步是尝试并行计算:如果我有N 线程,那么我可以给每个线程向量s 的一部分和矩阵A 的“对应”行。每个线程将计算一个 499 大小的 Vector,最终结果将是它们的条目总和。
首先,在Matrix 类中,我添加了一个方法来从Matrix 中提取一些行并构建一个更小的行:
Matrix<T> extractSomeRows(unsigned int start, unsigned int end)
{
unsigned int rowsToExtract = end - start + 1;
std::vector<T> tmp;
tmp.reserve(rowsToExtract * numColumns);
for(unsigned int i = start * numColumns ; i < (end+1) * numColumns ; ++i)
{
tmp.push_back(matrix[i]);
}
return Matrix<T>(rowsToExtract, numColumns, tmp);
}
然后我定义了一个线程例程
void timesMatrixThreadRoutine
(Matrix<T>& matrix, unsigned int start, unsigned int end, Vector<T>& newRow)
{
// newRow is supposed to contain the partial result
// computed by a thread
newRow.resize(matrix.columns());
for(unsigned int i = start ; i < end + 1 ; ++i)
{
for(unsigned int j = 0 ; j < matrix.columns() ; ++j)
{
newRow[j] += vector[i] * matrix.getElementAt(i - start, j);
}
}
}
最后我修改了上面展示的timesMatrix方法的代码:
Vector<T> timesMatrix(Matrix<T>& matrix)
{
static const unsigned int NUM_THREADS = 4;
unsigned int matRows = matrix.rows();
unsigned int matColumns = matrix.columns();
unsigned int rowsEachThread = vector.size()/NUM_THREADS;
std::thread threads[NUM_THREADS];
Vector<T> tmp[NUM_THREADS];
unsigned int start, end;
// all but the last thread
for(unsigned int i = 0 ; i < NUM_THREADS - 1 ; ++i)
{
start = i*rowsEachThread;
end = (i+1)*rowsEachThread - 1;
threads[i] = std::thread(&Vector<T>::timesMatrixThreadRoutine, this,
matrix.extractSomeRows(start, end), start, end, std::ref(tmp[i]));
}
// last thread
start = (NUM_THREADS-1)*rowsEachThread;
end = matRows - 1;
threads[NUM_THREADS - 1] = std::thread(&Vector<T>::timesMatrixThreadRoutine, this,
matrix.extractSomeRows(start, end), start, end, std::ref(tmp[NUM_THREADS-1]));
for(unsigned int i = 0 ; i < NUM_THREADS ; ++i)
{
threads[i].join();
}
Vector<unsigned int> result(matColumns);
for(unsigned int i = 0 ; i < NUM_THREADS ; ++i)
{
result = result + tmp[i]; // the operator+ is overloaded
}
return result;
}
它仍然可以工作,但现在需要将近 30000 微秒,几乎是以前的三倍。
我做错了吗?您认为有更好的方法吗?
编辑 - 使用“轻量级”VirtualMatrix
按照 Ilya Ovodov 的建议,我定义了一个类 VirtualMatrix,它包装了一个 T* matrixData,它在构造函数中被初始化为
VirtualMatrix(Matrix<T>& m)
{
numRows = m.rows();
numColumns = m.columns();
matrixData = m.pointerToData();
// pointerToData() returns underlyingVector.data();
}
然后有一种方法可以检索矩阵的特定条目:
inline T getElementAt(unsigned int row, unsigned int column)
{
return *(matrixData + row*numColumns + column);
}
现在执行时间更好(大约 8000 微秒),但可能还有一些改进。特别是现在的线程例程
void timesMatrixThreadRoutine
(VirtualMatrix<T>& matrix, unsigned int startRow, unsigned int endRow, Vector<T>& newRow)
{
unsigned int matColumns = matrix.columns();
newRow.resize(matColumns);
for(unsigned int i = startRow ; i < endRow + 1 ; ++i)
{
for(unsigned int j = 0 ; j < matColumns ; ++j)
{
newRow[j] += (vector[i] * matrix.getElementAt(i, j));
}
}
}
真正慢的部分是嵌套for 循环的部分。如果我删除它,结果显然是错误的,但会在不到 500 微秒内“计算”出来。这就是说现在传递参数几乎不需要时间,重要的部分实际上是计算。
根据你的说法,有什么方法可以让它更快?
【问题讨论】: