【问题标题】:Rcpp Parallel or openmp for matrixvector productRcpp Parallel 或 openmp 用于 matrixvector 产品
【发布时间】:2018-09-17 04:08:12
【问题描述】:

我正在尝试对 Conjugate gradient 的朴素并行版本进行编程,所以我从简单的 Wikipedia 算法开始,我想将 dot-productsMatrixVector 产品更改为相应的并行版本,Rcppparallel 文档有dot-product 的代码使用 parallelReduce;我想我会在我的代码中使用那个版本,但我正在尝试进行 MatrixVector 乘法,但与 R 基础(无并行)相比,我没有取得好的结果

并行矩阵乘法的一些版本:使用 OpenMP、Rcppparallel、串行版本、带有犰狳的串行版本以及基准

// [[Rcpp::depends(RcppParallel)]]
#include <Rcpp.h>
#include <RcppParallel.h>
#include <numeric>
// #include <cstddef>
// #include <cstdio>
#include <iostream>
using namespace RcppParallel;
using namespace Rcpp;

struct InnerProduct : public Worker
{   
   // source vectors
   const RVector<double> x;
   const RVector<double> y;

   // product that I have accumulated
   double product;

   // constructors
   InnerProduct(const NumericVector x, const NumericVector y) 
      : x(x), y(y), product(0) {}
   InnerProduct(const InnerProduct& innerProduct, Split) 
      : x(innerProduct.x), y(innerProduct.y), product(0) {}

   // process just the elements of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      product += std::inner_product(x.begin() + begin, 
                                    x.begin() + end, 
                                    y.begin() + begin, 
                                    0.0);
   }

   // join my value with that of another InnerProduct
   void join(const InnerProduct& rhs) { 
     product += rhs.product; 
   }
};

struct MatrixMultiplication : public Worker
{
   // source matrix
   const RMatrix<double> A;

    //source vector
   const RVector<double> x;

   // destination matrix
   RMatrix<double> out;

   // initialize with source and destination
   MatrixMultiplication(const NumericMatrix A, const NumericVector x, NumericMatrix out) 
     : A(A), x(x), out(out) {}

   // take the square root of the range of elements requested
   void operator()(std::size_t begin, std::size_t end) {
      for (std::size_t i = begin; i < end; i++) {
            // rows we will operate on
            //RMatrix<double>::Row rowi = A.row(i);
            RMatrix<double>::Row rowi = A.row(i);

            //double res = std::inner_product(rowi.begin(), rowi.end(), x.begin(), 0.0);
            //Rcout << "res" << res << std::endl;
            out(i,1) = std::inner_product(rowi.begin(), rowi.end(), x.begin(), 0.0);
            //Rcout << "res" << out(i,1) << std::endl;
      }
    }  
};

// [[Rcpp::export]]
double parallelInnerProduct(NumericVector x, NumericVector y) {

   // declare the InnerProduct instance that takes a pointer to the vector data
   InnerProduct innerProduct(x, y);

   // call paralleReduce to start the work
   parallelReduce(0, x.length(), innerProduct);

   // return the computed product
   return innerProduct.product;
}
//librar(Rbenchmark)

// [[Rcpp::export]]
NumericVector matrixXvectorRcppParallel(NumericMatrix A, NumericVector x) {

   // // declare the InnerProduct instance that takes a pointer to the vector data
   // InnerProduct innerProduct(x, y);
   int nrows = A.nrow();
   NumericVector out(nrows);
   for(int i = 0; i< nrows;i++ )
   {
      out(i) = parallelInnerProduct(A(i,_),x);
   }
   // return the computed product
   return out;
}

// [[Rcpp::export]]
arma::rowvec matrixXvectorParallel(arma::mat A, arma::colvec x){
    arma::rowvec y = A.row(0)*0;
    int filas = A.n_rows;
    int columnas = A.n_cols;
    #pragma omp parallel for
    for(int j=0;j<columnas;j++)
    {
        //y(j) = A.row(j)*x(j))
        y(j) = dotproduct(A.row(j),x);
    }
    return y;
} 

arma::mat matrixXvector2(arma::mat A, arma::mat x){
  //arma::rowvec y = A.row(0)*0;
  //y=A*x;
  return A*x;
}

arma::rowvec matrixXvectorParallel2(arma::mat A, arma::colvec x){
    arma::rowvec y = A.row(0)*0;
    int filas = A.n_rows;
    int columnas = A.n_cols;

 #pragma omp parallel for
    for(int j = 0; j < columnas ; j++){
        double result = 0;
        for(int i = 0; i < filas; i++){
                result += x(i)*A(j,i);   
        }
        y(j) = result;
    }
    return y;
}

基准测试

                             test replications elapsed relative user.self sys.self user.child sys.child
1                         M %*% a           20   0.026    1.000     0.140    0.060          0         0
2 matrixXvector2(M, as.matrix(a))           20   0.040    1.538     0.101    0.217          0         0
4    matrixXvectorParallel2(M, a)           20   0.063    2.423     0.481    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.146    5.615     0.745    0.398          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.335   12.885     2.305    0.079          0         0

我目前的最后一次尝试是使用带有 Rcppparallel 的 parallefor,但我遇到了内存错误,我不知道问题出在哪里

// [[Rcpp::export]]
NumericVector matrixXvectorRcppParallel2(NumericMatrix A, NumericVector x) {

   // // declare the InnerProduct instance that takes a pointer to the vector data
   int nrows = A.nrow();
   NumericMatrix out(nrows,1); //allocar mempria de vector de salida
   //crear worker
   MatrixMultiplication matrixMultiplication(A, x, out);


   parallelFor(0,A.nrow(),matrixMultiplication);

   // return the computed product
   return out;
}    

我注意到的是,当我使用 htop 在终端中检查处理器的工作方式时,当我使用 R-base 应用传统的矩阵向量乘法时,我在 htop 中看到,即使用所有处理器,所以矩阵乘法默认并行执行?因为理论上,如果是串行版本,则应该只有一个处理器可以工作。

如果有人知道哪个是更好的路径,OpenMP 或 Rcppparallel,或者其他方式,这给我比 R-base 的明显串行版本更好的性能。

目前共轭梯度的序列号

// [[Rcpp::export]]
arma::colvec ConjugateGradient(arma::mat A, arma::colvec xini, arma::colvec b, int num_iteraciones){
    //arma::colvec xnew = xini*0 //inicializar en 0's
    arma::colvec x= xini; //inicializar en 0's
    arma::colvec rkold = b - A*xini;
    arma::colvec rknew = b*0;
    arma::colvec pk = rkold;
    int k=0;
    double alpha_k=0;
    double betak=0;
    double normak = 0.0;

    for(k=0; k<num_iteraciones;k++){
         Rcout << "iteracion numero " << k << std::endl;
        alpha_k =  sum(rkold.t() * rkold) / sum(pk.t()*A*pk); //sum de un elemento para realizar casting
        (pk.t()*A*pk);
        x = x+ alpha_k * pk;
        rknew = rkold - alpha_k*A*pk;
        normak =  sum(rknew.t()*rknew);
        if( normak < 0.000001){
            break;
        }
        betak = sum(rknew.t()*rknew) / sum( rkold.t() * rkold );

        //actualizar valores para siguiente iteracion
        pk = rknew + betak*pk;
        rkold = rknew;

    }

    return x;

}

我不知道在 R 中使用 BLAS,感谢 Hong Ooi 和 tim18,所以新的基准测试使用 option(matprod="internal") 和 option(matprod="blas")

options(matprod = "internal")
res<-benchmark(M%*%a,matrixXvector2(M,as.matrix(a)),matrixXvectorParallel(M,a),matrixXvectorParallel2(M,a),matrixXvectorRcppParallel(M,a),order="relative",replications = 20)
res

                             test replications elapsed relative user.self sys.self user.child sys.child
2 matrixXvector2(M, as.matrix(a))           20   0.043    1.000     0.107    0.228          0         0
4    matrixXvectorParallel2(M, a)           20   0.069    1.605     0.530    0.000          0         0
1                         M %*% a           20   0.072    1.674     0.071    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.140    3.256     0.746    0.346          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.343    7.977     2.272    0.175          0         0

选项(matprod="blas")

options(matprod = "blas")

res<-benchmark(M%*%a,matrixXvector2(M,as.matrix(a)),matrixXvectorParallel(M,a),matrixXvectorParallel2(M,a),matrixXvectorRcppParallel(M,a),order="relative",replications = 20)
res
                             test replications elapsed relative user.self sys.self user.child sys.child
1                         M %*% a           20   0.021    1.000     0.093    0.054          0         0
2 matrixXvector2(M, as.matrix(a))           20   0.092    4.381     0.177    0.464          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.328   15.619     2.143    0.109          0         0
4    matrixXvectorParallel2(M, a)           20   0.438   20.857     3.036    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.546   26.000     3.667    0.127          0         0

【问题讨论】:

  • 1.您正在测试的那些矩阵有多大? 2. 您是否使用多线程 BLAS(例如 Microsoft R 附带的那个)?
  • 正如之前的海报所暗示的,有足够多的多线程 blas 实现可用,我们无需帮助您了解 openmp 在这些情况下如何与各种编译器一起工作。有些是免费的,例如 mkl 和 acml,并且可能支持除 openmp 之外的其他线程模型。
  • 我正在使用 R 3.4.4(没有 Microsoft R)(我在 docker 容器 (hub.docker.com/r/rocker/verse) 中使用 R,因为我无法安装一些库),测试是一个 1200x1200 的矩阵乘以 1200 个元素的向量,据我所知,我没有使用 BLAS,但我的一个疑问是 R 默认情况下是否使用多线程来计算矩阵乘法,因为 htop 似乎注意到所有处理器的使用当我测试时。
  • 感谢 Hong Ooi 和 Tim18,我在文档中发现实际上正在使用 blas,我没有意识到这一点,所以我使用避免使用 blas 的选项运行了一个新基准,我将使用新值编辑我的帖子
  • 对于 Rcpp,您是否为 OpenMP 编辑了文件 cat .R/Makevars。我的文件只有一行CXXFLAGS= -Wall -std=c++14 -O3 -march=native -ffp-contract=fast -fopenmp

标签: openmp rcpp matrix-multiplication armadillo rcppparallel


【解决方案1】:

您已经发现,如果使用多线程 BLAS 实现,则基本 R 矩阵乘法可以是多线程的。 rocker/* docker 镜像就是这种情况,通常使用 OpenBLAS。

此外,(Rcpp)Armadillo 已经使用 R 使用的 BLAS 库(在本例中为多线程 OpenBLAS)以及 OpenMP。所以你的“串行”版本实际上是多线程的。您可以在htop 中使用足够大的矩阵作为输入来验证这一点。

顺便说一句,在我看来,你正在尝试做的事情就像 premature optimization

【讨论】:

  • 非常感谢拉尔夫,我按照你之前所说的做了,我发布的图片就是所显示的,但很高兴知道为什么,我认为你的回答会解决我的疑问,是的,你关于我正在做的事情是正确的还为时过早,但我不了解算法的块版本,所以现在我试图只并行产品和点产品,但很高兴知道给定的摇杆默认情况下使用 OpenBLAS 多线程,我试图做的事情是无用的,所以我需要尝试不同的东西
猜你喜欢
  • 1970-01-01
  • 2019-05-07
  • 1970-01-01
  • 1970-01-01
  • 2012-07-22
  • 2017-05-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多