【问题标题】:Replacement for parallel plyr with doMC用 doMC 替代并行 plyr
【发布时间】:2018-05-15 19:35:02
【问题描述】:

考虑对 data.frame 的标准分组操作:

library(plyr)
library(doMC)
library(MASS) # for example

nc <- 12
registerDoMC(nc)

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

res <- ddply(d, .(g), function(d_group) {
   # slow, complicated operations on d_group
}, .parallel = FALSE)

只需编写.parallel = TRUE 即可充分利用多核设置。这是我最喜欢的 plyr 功能之一。

但是随着 plyr 被弃用(我认为)并基本上被 dplyr、purrr 等取代,并行处理的解决方案变得更加冗长:

library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example

nc <- 12

d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)

d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()

rm(d_parts)
rm(cl)

考虑到循环内需要的每个包和对象都需要自己的cluster_* 命令将其复制到节点上,您可以想象这个示例需要多长时间。非并行化的 plyr 到 dplyr 的转换只是一个简单的 dplyr::group_by 构造,不幸的是没有简洁的方法可以对其进行并行处理。所以,我的问题是:

  • 这实际上是将我的代码从 plyr 转换为 dplyr 的首选方式吗?
  • 在 plyr 的幕后发生了什么样的魔力,使得开启并行处理变得如此容易?是否有理由将此功能特别难以添加到 dplyr 中,而这就是它尚不存在的原因?
  • 我的两个示例在代码执行方式方面是否存在根本不同?

【问题讨论】:

  • 关于您的第三个问题:我会说是的。您的plyr 示例使用doMC,即multicore 的后端foreach,即:forking。您的multidplyr 示例使用默认为parallel::makePSOCKclustercreate_cluster,即:Parallel SOCket Cluster
  • 关于您的第二个问题:如果您只调用 partition() 而没有提前设置集群,就会发生同样的魔法:plyr 依赖于先前注册的 foreach 后端 (@987654336 @),没有集群的multidplyr::partition() 隐式依赖create_cluster(),但如果一个已经注册的后端可能会检测到另一个后端(不过,我还没有检查,请参阅print(multidplyr:::cluster_exists)))。 multidplyr 小插图的第一个示例说明了这种无需预先设置即可简单调用 partition() 的功能。
  • 关于您的第一个问题:据我所知,从文档和我自己的实验来看,multidplyr 不允许像 plyr 那样分叉,只有 PSOCK

标签: r dplyr plyr tidyverse multidplyr


【解决方案1】:
  1. 我认为没有一种真正的“首选”方式可以将 {plyr} 代码转换为 {dplyr}。

  2. 在 cmets 中,@Aurèle 在描述 {plyr} 和 {doMC} 之间的联系方面做得比我做得更好。发生的一件事是激励措施发生了一些变化。 {doMC} 来自 Revolution Analytics(已被 Microsoft 收购)。但是开发 dplyr 的 Hadley 目前在 RStudio 工作。这两家公司在 IDE 领域展开竞争。因此,它们的软件包无法很好地配合使用,这可能是很自然的。我看到对 RStudio 的强烈支持的唯一形式是 {sparklyr},他们已经使设置相对“容易”。但是,我真的不推荐使用 Spark 为单台机器进行并行处理。

  3. @Aurèle 再次很好地解释了执行差异。您的新代码使用 PSOCK 集群,旧代码使用分叉。分叉使用写时复制模式来访问 RAM,因此并行进程可以在分叉后立即访问相同的数据开始。 PSOCK 集群就像生成 R 的新副本 - 它们必须加载库并接收数据的显式副本。

您可以使用类似...的模式

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

...在map_df 步骤上进行一些并行处理。请注意,在 {purrr} 下,~ 是匿名函数语法,其中 .x 是已映射的值。

如果你喜欢危险的生活,你可以通过使用 {purrr} 中的私有方法来创建类似的版本而不使用 {future}

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}

【讨论】:

  • 感谢您的解释。我还没有尝试过代码,但是 purrr+future 可能是一个不错的解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-05-14
  • 2020-03-07
  • 1970-01-01
  • 2013-09-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多