【问题标题】:parallel computing with foreach Rcpp and RNG much slower than expected使用 foreach Rcpp 和 RNG 进行并行计算比预期慢得多
【发布时间】:2018-08-21 14:01:37
【问题描述】:

我有一个 Rcpp 函数,它输出一个我想保存为 R 对象的大矩阵。我的想法是使用我的 Rcpp 函数与包 foreach 并行来加快速度。

对于相同的矩阵大小,在我的 Windows 机器上使用 foreach 所需的时间大约是不使用 foreach 运行函数的五倍以上(不包括工人的设置)。 我知道有关并行执行非常小的任务的问题(例如Why is the parallel package slower than just using apply?)。此外,我愿意抛开并行运行随机数生成器的理论问题,因为结果可能不再是真正随机的。

由于我的子任务应该足够大,显然我编写的 Rcpp 函数不能很好地并行工作,但我不知道为什么。在 Rcpp 函数中使用 RNG 仅仅是一项无法并行化的任务吗?除此之外:foreach 中的子矩阵是否存在最优 i 和最优 ncol(此处为 n_bootstrap)?非常感谢任何帮助。如果您愿意,也请随意评论代码。

澄清:我编译了一个包并在foreach中使用了mypackage::funC

这是 R 中的示例代码:

y <- funC(n_bootstrap = 250, n_obs_censusdata = 300000,
          locationeffects = as.numeric(1:200), 
          residuals = as.numeric(1:20000),
          X = matrix(as.numeric(1:3000000), ncol = 10), 
          beta_sample = matrix(as.numeric(1:2500), ncol = 250))

并行:

no_cores <- parallel::detectCores() - 2
cl <- parallel::makeCluster(no_cores)
doParallel::registerDoParallel(cl)

y <- foreach(i=1:5, .combine = "cbind") %dopar% {

  funC(n_bootstrap = 50,
       n_obs_censusdata = 300000, locationeffects = as.numeric(1:200), 
       residuals = as.numeric(1:20000), 
       X = matrix(as.numeric(1:3000000), ncol = 10), 
       beta_sample = matrix(as.numeric(1:2500), ncol = 250))
                       }
parallel::stopCluster(cl)

添加:使用 bigstatsr

y <- bigstatsr::FBM(nrow = 300000, ncol = 250, type = "double")
bigstatsr::big_apply(y, a.FUN = function(y, ind, fun) {
          y[, ind] <- fun(n_bootstrap = length(ind),
                                    n_obs_censusdata = 300000,
                                    locationeffects = as.numeric(1:200),
                                    residuals = as.numeric(1:20000),
                                    X = matrix(as.numeric(1:3000000), ncol = 10), 
                                    beta_sample =  matrix(as.numeric(1:2500), ncol = 250))
          NULL
        }, a.combine = 'c', ncores = bigstatsr::nb_cores(), fun = funC)+

这里是 Rcpp 代码:

// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-

#include <RcppEigen.h>
#include <random>

using namespace Rcpp;
// [[Rcpp::depends(RcppEigen)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::export]]
SEXP funC(const int n_bootstrap,
          const int n_obs_censusdata, 
          const Eigen::Map<Eigen::VectorXd> locationeffects, 
          const Eigen::Map<Eigen::VectorXd> residuals,
          const Eigen::Map<Eigen::MatrixXd> X, 
          const Eigen::Map<Eigen::MatrixXd> beta_sample)
{

  // --------- create random sample of locations and of residuals --------- //

    // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  std::mt19937 gen(rd()); // Mersenne Twister engine 

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

  std::uniform_int_distribution<> distrloc(1, upperlocation);
  std::uniform_int_distribution<> distrres(1, upperresiduals);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  for (int i=0; i<n_obs_censusdata; ++i)
    for (int j=0; j<n_bootstrap; j++)
      LocationEffectResiduals(i,j) = locationeffects[distrloc(gen)-1] + residuals[distrres(gen)-1]; // subtract 1 because in C++ indices start with 0

  // ----- create Xbeta ------- //
    Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
    Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return Rcpp::wrap(returnmatrix);
}

【问题讨论】:

  • 不直接相关:with mingw std::random_device 实际上是确定性的,给你五个实例做同样的事情,c.f. sourceforge.net/p/mingw-w64/bugs/338
  • 啊,这点很好,谢谢。有没有办法可以改变 C++ 中的函数来解决这个问题?还是我必须求助于其他 R 包,如 doRNG?
  • 对于像 sample() 这样的 R 函数也是如此吗?显然,我可以使用 R 函数而不是 C++ 来实现相同的结果,但至少按顺序我发现这要快得多。如果我可以并行执行任务,也许切换回来可能是值得的
  • 如何并行执行?您的代码看起来像是在使用sourceCpp(),但这与并行使用不兼容,因为工作人员将无法使用funC。你机器上的no_cores 是什么?当您切换到使用 R 的 RNG 时,doRNG 应该可以工作。否则,请使用为并行工作设计的 RNG(例如 cran.r-project.org/package=sitmocran.r-project.org/package=dqrng),并结合用户提供的种子和工作人员编号。
  • 我编译了一个包并将它与 mypackage::funC 一起使用。 parallel::detectCores() 产生 8,所以 no_cores = 6。好的。因此,例如,我可以在 C++ 函数中创建一个 int 种子输入,然后确保调用的每个 funC 实例都获得不同的种子输入,对吧?

标签: r foreach rcpp


【解决方案1】:

在这里你想创建一个大矩阵。原则上可以将其分配给多个进程,但最终要承担合并结果的成本。我建议在这里使用“共享内存并行”。我使用OpenMP code from here 作为您的算法并行版本的起点:

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::depends(dqrng)]]
#include <xoshiro.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>
// [[Rcpp::plugins(cpp11)]]
#include <random>

// [[Rcpp::export]]
Eigen::MatrixXd funD(const int n_bootstrap,
                     const int n_obs_censusdata, 
                     const Eigen::Map<Eigen::VectorXd> locationeffects, 
                     const Eigen::Map<Eigen::VectorXd> residuals,
                     const Eigen::Map<Eigen::MatrixXd> X, 
                     const Eigen::Map<Eigen::MatrixXd> beta_sample,
                     int ncores) {

  // --------- create random sample of locations and of residuals --------- //

  // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  dqrng::xoshiro256plus gen(rd());

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

   // subtract 1 because in C++ indices start with 0
  std::uniform_int_distribution<> distrloc(0, upperlocation - 1);
  std::uniform_int_distribution<> distrres(0, upperresiduals - 1);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  #pragma omp parallel num_threads(ncores)
  {
    dqrng::xoshiro256plus lgen(gen);      // make thread local copy of rng 
    lgen.jump(omp_get_thread_num() + 1);  // advance rng by 1 ... ncores jumps 

    #pragma omp for
    for (int i=0; i<n_obs_censusdata; ++i)
      for (int j=0; j<n_bootstrap; j++)
        LocationEffectResiduals(i,j) = locationeffects[distrloc(lgen)] + residuals[distrres(lgen)];
  }  

  // ----- create Xbeta ------- //
  Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
  Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return returnmatrix;
}

在我的双核 Linux 系统上,我的 funDncores = 1 比你的 funC 稍快,可能是因为使用的 RNG 更快。使用ncores = 2,它又增加了 30-40%。不错,因为并非所有代码都是并行执行的。我不知道这些天在 Windows 上的 OpenMP 性能有多好。改用RcppParallel 可能更有意义。但这需要对您的代码进行更多更改。

以上代码的来源是Rcpp::sourceCpp()。当你把它放入一个包中时,你应该使用

CXX_STD = CXX11
PKG_CXXFLAGS = $(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS = $(SHLIB_OPENMP_CXXFLAGS)

Makevars(.win)。请注意,根据WRE,如果 C++11 与 C++98 使用不同的编译器,这可能不会像预期的那样工作。 IIRC Solaris 是唯一在默认配置中出现这种情况的平台。所以对于内部包你应该没问题。

【讨论】:

  • 你太棒了,谢谢。我将在今天晚些时候尝试实现这一点。在这个例子中,我不必担心线程安全,因为我一次只写入一个元素,对吧?另外,我不必在 Rcpp 之外的 R 中设置任何工作人员,只需调用 X
  • @NikosBosse 线程安全是个问题。您必须使用线程安全的数据类型(Eigen 很好,AFAIK,Rcpp 类型不是),并且您不能为循环中的不同索引写入数据结构的相同元素。是的,无需设置集群或使用 foreach。不,由于需要设置一个包,我没有尝试并行 funC。
  • @NikosBosse 告诉我们最终结果如何。关于未来的问题:注意代码是否必须是包的一部分。不过,这使得创建minimal reproducible example 变得更加困难。一种方法是在 GitHub 或类似网站上创建一个存储库,因为它可以轻松安装。
  • @NikosBosse 我的代码用于sourceCpp()。在包中,Rcpp::dependesRcpp::plugins 属性无效。有关完整文档,请参阅 WRE。简而言之:将SHLIB_OPENMP_CXXFLAGS 添加到PKG_CXXFLAGSPKG_LIBS
  • @NikosBosse 此外,您应该取消注释 #CXX_STD = CXX11 行,以明确您使用的是 C++11。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-11-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-18
  • 2021-05-13
相关资源
最近更新 更多