future 包(我是作者)内置了对嵌套并行性的支持,因此您作为开发人员不必担心它,同时仍让最终用户完全有权控制并行化的方式和位置发生。
这是来自future vignettes 之一的示例:
library("future")
library("listenv")
x <- listenv()
for (ii in 1:3) {
x[[ii]] %<-% {
y <- listenv()
for (jj in 1:3) {
y[[jj]] %<-% { ii + jj/10 }
}
y
}
}
unlist(x)
## [1] 1.1 1.2 1.3 2.1 2.2 2.3 3.1 3.2 3.3
注意未来的分配有两层 (%<-%)。 默认情况下始终按顺序处理它们,除非另有说明。例如,要在本地计算机上并行处理未来分配的外循环,请使用:
plan(multiprocess)
这将导致x[[ii]] %<-% { ... } for ii = 1, 2, 3 并行运行,而包含的y[[jj]] %<-% { ... } 将按顺序运行。等效的完全显式设置是:
plan(list(multiprocess, sequential))
现在,如果您想顺序运行期货的外循环 (x[[ii]]) 和并行运行期货的内循环 (y[[jj]]),您可以指定:
plan(list(sequential, multiprocess))
在运行代码之前。
顺便说一句,multiprocess 使用的并行进程数是future::availableCores()。将其视为parallel::detectCores(),但这对于mc.cores、HPC 集群环境等也很灵活。重要的是,future::availableCores() 将返回1,如果它已经并行运行(“是并行子”)。这意味着如果你这样做:
plan(list(multiprocess, multiprocess))
future 的内层实际上只会看到一个内核。您可以将其视为一种内置的自动保护,可防止通过递归并行性错误地创建大量并行进程。
您可以强制使用不同的设置(但不推荐)。例如,假设您希望外层同时运行四个并行任务,并且每个任务同时运行两个并行任务(在您的本地机器上),那么您可以使用:
plan(list(
tweak(multiprocess, workers = 4L),
tweak(multiprocess, workers = 2L)
))
这将最多同时运行 4*2 = 8 个并行任务(加上主进程)。
如果你有一组可用的机器,你可以这样做:
plan(list(
tweak(cliuster, workers = c("machine1", "machine2", "machine3")),
multiprocess
))
这会将期货的外层 (x[[ii]]) 分配给这三台机器,而期货的内层 (y[[ii]]) 将使用这些机器上的所有可用内核并行运行。
请注意代码没有改变 - 只有设置(= plan() 调用)。这是本着“一次编写,随处运行”的精神。您可以使用许多不同的未来策略设置;查看未来包的小插曲。
现在,如果你想使用foreach() 怎么办?您可以使用在未来框架之上工作的 doFuture %dopar% 适配器。例如,
library("doFuture")
registerDoFuture()
some_fun <- function(j) {
list(j = j, pid.j = Sys.getpid())
}
my_fun <- function(i) {
y <- foreach(j = 1:3) %dopar% { some_fun(j = j) }
list(i = i, pid.i = Sys.getpid(), y = y)
}
x <- foreach(i = 1:3) %dopar% { my_fun(i = i) }
运行上面的代码并查看str(x) 及其针对上面示例的不同plan():s 的不同PID。这将说明发生了什么。
希望对你有帮助