【问题标题】:R parallel: rbind parallely into separate data.framesR并行:rbind并行到单独的data.frames
【发布时间】:2015-05-22 12:07:03
【问题描述】:

以下代码在 Windows 和 Ubuntu 平台上产生不同的结果。我理解这是因为处理并行处理的方法不同。

总结:
我不能 insert / rbind 在 Linux 上并行处理数据(mclapplymcmapply),而 我可以在 Windows 上完成 .

感谢@Hong Ooi 指出mclapply 不能在Windows 上并行工作,但以下问题仍然有效。

当然,同一个data.frame没有多次插入,每次插入都在单独的data.frame中执行。

library(R6)
library(parallel)

# storage objects generator
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)
    )
)

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))

# random data.frames
set.seed(1)
ldt <- lapply(i, function(i) data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))

# entity storage
lcl1 <- lapply(i, function(i) cl$new())
lcl2 <- lapply(i, function(i) cl$new())
lcl3 <- lapply(i, function(i) cl$new())

# insert data
invisible({
    mclapply(names(i), FUN = function(n) lcl1[[n]]$insert(ldt[[n]]))
    mcmapply(FUN = function(dt, cl) cl$insert(dt), ldt, lcl2, SIMPLIFY=FALSE)
    lapply(names(i), FUN = function(n) lcl3[[n]]$insert(ldt[[n]]))
})

### Windows

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

### Unix

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

还有问题:

如何在 Linux 平台上实现 rbind 到单独的 data.frames 中?

附:在我的情况下,像SQLite 这样的内存外存储不能被视为解决方案。

【问题讨论】:

  • 您确实意识到 mclapply 实际上并没有在 Windows 上并行运行,对吧?
  • 一个提示是查看data.table 包。我知道我的提示不能直接回答您的问题,但仍可能有助于提高性能。 data.table 包比基本 Rdata.frames 包更适合大型数据集(GB 范围)。
  • @RichardErickson 这是data.table,我翻译成data.frame只是为了提问;)
  • @RichardErickson,Jan 是 data.table 包及其不同扩展的主要开发者之一...
  • @DavidArenburg 肯定不是 main 开发者之一,我刚刚推送了几个小提交,你应该试试 :)

标签: r parallel-processing parallel-foreach r6 rparallel


【解决方案1】:

问题是mclapplymcmapply 不打算与有副作用的函数一起使用。您的函数正在修改列表中的对象,但mclapply 不会将修改后的对象发送回主进程:它只返回函数显式返回的值。这意味着当工作人员在mclapply 返回时退出时,您的结果将丢失。

通常我会将代码更改为不依赖于副作用,并返回已修改的对象。这是使用clusterApply 的一种方法,它也可以在 Windows 上并行工作:

library(R6)
library(parallel)
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)))

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
set.seed(1)
ldt <- lapply(i, function(i)
  data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
nw <- 3  # number of workers
clust <- makePSOCKcluster(nw)
idx <- splitIndices(length(i), nw)
nameslist <- lapply(idx, function(iv) names(i)[iv])

lcl4 <- do.call('c', clusterApply(clust, nameslist, 
  function(nms, cl, ldt) {
    library(R6)
    lcl4 <- lapply(nms, function(n) cl$new())
    names(lcl4) <- nms
    lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
    lcl4
  }, cl, ldt))

如果您想创建一次对象列表,然后并行修改多个对象,则此方法不起作用。这也是可能的,但你必须有持久的工人。在这种情况下,您会在所有任务完成后从工作人员那里获取修改后的对象。不幸的是,mclapply 没有使用持久化的worker,所以在这种情况下你必须使用基于集群的函数,例如clusterApply。这是一种方法:

# Initialize the cluster workers
clusterEvalQ(clust, library(R6))
clusterExport(clust, c('cl', 'ldt'))
clusterApply(clust, nameslist, function(nms) {
  x <- lapply(nms, function(n) cl$new())
  names(x) <- nms
  assign('lcl4', x, pos=.GlobalEnv)
  NULL
})

# Insert data into lcl4 on each worker
clusterApply(clust, nameslist, function(nms) {
  lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
  NULL
})

# Concatenate lcl4 from each worker
lcl4 <- do.call('c', clusterEvalQ(clust, lcl4))

这和之前的方法很相似,只是将流程分为三个阶段:worker初始化、任务执行和结果检索。我还使用clusterExportclusterEvalQ 以更传统的方式初始化了worker。

【讨论】:

  • 对并行 R 的理解给我留下了深刻的印象!不知道RRO能不能简化api?
  • 意思是“Revolution R Open”?我没用过,所以不知道。
  • @jangorecki 你试过用data.table::rbindlist吗?
  • @RomanLuštrik 是的,但目标是将这项工作分布在一组机器上。我知道rbindlist 非常快。
【解决方案2】:

我认为mclapply 的Windows 版本正在运行,因为它将其工作委托给lapply。检查时序或 CPU 核心使用情况可以验证这一点。根据R source,Windows 的mclapplymcmapply 被替换为顺序版本。

看来,代码的并行化方式有问题,目前看不出具体是什么。

【讨论】:

  • mclapply 在 Linux 上使用 fork 系统调用来克隆当前的 R 进程。此调用在 Windows 上不存在,因此 mclapply 恢复为顺序行为。
猜你喜欢
  • 1970-01-01
  • 2020-11-28
  • 2011-11-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-03-28
  • 2013-11-30
相关资源
最近更新 更多