【问题标题】:Identify if in parallel call识别是否并行调用
【发布时间】:2017-03-18 21:39:15
【问题描述】:

我有一个通用的分块功能,可以将大调用分解成更小的部分并并行运行它们。

chunk_it <- function(d, n, some_fun) {
  # run n chunks of d in parallel
  dat <- foreach(...) %doPar% {
   some_fun(...)
  }
}

我想让这个通用分块函数可以识别它是否被一个已经并行的进程调用(在我的术语中是分块的)

chunked_highlevel <- function(d, n, some_fun) {
  # run n chunks of d in parallel
  ...
  chunk_it(lowerlevel_d, n) # do not chunk! 
}

我希望在这里发生的是,如果我在更高级别对进程进行了分块,它不会在较低级别激活分块功能。

有没有办法确定您何时已经处于并行进程中?

所以,我们可以这样编码:

chunk_it <- function(d, n, some_fun) {
  # run n chunks of d in parallel
  if(!already_parallel) {
   dat <- foreach(...) %doPar% {
    some_fun(...)
   }
  } else {
   dat <- some_fun()
  }
}

【问题讨论】:

标签: r parallel-processing


【解决方案1】:

我认为没有正式的方法可以做到这一点。但是,通常调用堆栈中应该有明显的代码,这使得您是否在并行代码中很明显。到目前为止,我所拥有的看起来像这样。它似乎适用于带有 MPI 或 SOCK 的 doSNOW,但可能需要对实现 %dopar% 的其他包进行调整。它还依赖于snow 的一些内部细节,这些细节可能会在未来的版本中发生变化。

library(doSNOW)
library(foreach)
my_fn <- function(bit) {
  is_parallel <- any(unlist(lapply(sys.calls(), function(cal) {
    as.character(cal[[1]]) %in% c("slaveLoop", "%dopar%")
    })))
  is_parallel
}

foreach(x = 1:2) %do% my_fn(x)
# [[1]]
# [1] FALSE
# 
# [[2]]
# [1] FALSE

cl <- makeCluster(2)
registerDoSNOW()
foreach(x = 1:2) %dopar% my_fn(x)
# [[1]]
# [1] TRUE
# 
# [[2]]
# [1] TRUE

【讨论】:

    【解决方案2】:

    future 包(我是作者)内置了对嵌套并行性的支持,因此您作为开发人员不必担心它,同时仍让最终用户完全有权控制并行化的方式和位置发生。

    这是来自future vignettes 之一的示例:

    library("future")
    library("listenv")
    x <- listenv()
    for (ii in 1:3) {
      x[[ii]] %<-% {
        y <- listenv()
        for (jj in 1:3) {
          y[[jj]] %<-% { ii + jj/10 }
        }
        y
      }
    }
    unlist(x)
    ## [1] 1.1 1.2 1.3 2.1 2.2 2.3 3.1 3.2 3.3
    

    注意未来的分配有两层 (%&lt;-%)。 默认情况下始终按顺序处理它们,除非另有说明。例如,要在本地计算机上并行处理未来分配的外循环,请使用:

    plan(multiprocess)
    

    这将导致x[[ii]] %&lt;-% { ... } for ii = 1, 2, 3 并行运行,而包含的y[[jj]] %&lt;-% { ... } 将按顺序运行。等效的完全显式设置是:

    plan(list(multiprocess, sequential))
    

    现在,如果您想顺序运行期货的外循环 (x[[ii]]) 和并行运行期货的内循环 (y[[jj]]),您可以指定:

    plan(list(sequential, multiprocess))
    

    在运行代码之前。

    顺便说一句,multiprocess 使用的并行进程数是future::availableCores()。将其视为parallel::detectCores(),但这对于mc.cores、HPC 集群环境等也很灵活。重要的是,future::availableCores() 将返回1,如果它已经并行运行(“是并行子”)。这意味着如果你这样做:

    plan(list(multiprocess, multiprocess))
    

    future 的内层实际上只会看到一个内核。您可以将其视为一种内置的自动保护,可防止通过递归并行性错误地创建大量并行进程。

    您可以强制使用不同的设置(但不推荐)。例如,假设您希望外层同时运行四个并行任务,并且每个任务同时运行两个并行任务(在您的本地机器上),那么您可以使用:

    plan(list(
      tweak(multiprocess, workers = 4L),
      tweak(multiprocess, workers = 2L)
    ))
    

    这将最多同时运行 4*2 = 8 个并行任务(加上主进程)。

    如果你有一组可用的机器,你可以这样做:

    plan(list(
      tweak(cliuster, workers = c("machine1", "machine2", "machine3")),
      multiprocess
    ))
    

    这会将期货的外层 (x[[ii]]) 分配给这三台机器,而期货的内层 (y[[ii]]) 将使用这些机器上的所有可用内核并行运行。

    请注意代码没有改变 - 只有设置(= plan() 调用)。这是本着“一次编写,随处运行”的精神。您可以使用许多不同的未来策略设置;查看未来包的小插曲。

    现在,如果你想使用foreach() 怎么办?您可以使用在未来框架之上工作的 doFuture %dopar% 适配器。例如,

    library("doFuture")
    registerDoFuture()
    
    some_fun <- function(j) {
      list(j = j, pid.j = Sys.getpid())
    }
    
    my_fun <- function(i) {
      y <- foreach(j = 1:3) %dopar% { some_fun(j = j) }
      list(i = i, pid.i = Sys.getpid(), y = y)
    }
    
    x <- foreach(i = 1:3) %dopar% { my_fun(i = i) }
    

    运行上面的代码并查看str(x) 及其针对上面示例的不同plan():s 的不同PID。这将说明发生了什么。

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-11-03
      相关资源
      最近更新 更多