【问题标题】:share object between worker processes在工作进程之间共享对象
【发布时间】:2017-12-15 11:10:20
【问题描述】:

我想在许多不同的工作进程上运行f(x),这些工作进程运行一台(多于一台的奖励积分)远程机器,其中x 是一个大对象。

我的交互式 R 会话在 node0 上运行,并且我使用 parallel 库,所以我执行以下操作:

library(parallel)

cl <- makeCluster(rep("node1", times = 64))
clusterExport(cl, "x")
clusterExport(cl, "f")

clusterEvalQ(cl, f(x))

问题是发送x需要相当长的时间,因为它是通过网络连接单独传输到每个工作进程的,而主进程运行的机器是分开的。

问题: 是否可以只向每个节点发送一次x 并让工作进程在本地复制它?

【问题讨论】:

  • 您确定每次迭代都发送对象吗?
  • 对象在clusterExport(cl, "x")这一行中一个接一个地发送给每个工作进程,这很慢,因为它是通过网络连接发生的。发送一次就足够了,然后将其从一个工作进程复制到另一个工作进程的内存中。

标签: r parallel-processing


【解决方案1】:

假设主服务器和远程主机之间的连接是瓶颈,您可以将一个副本传输到第一个工作人员,然后将其缓存到文件中,并让其他工作人员从该缓存文件中读取数据。比如:

library("parallel")

## Large data object
x <- 1:1e6
f <- function(x) mean(x)

## All N=64 workers are on the same host
cl <- makeCluster(rep("node1", times = 64))

## Send function
clusterExport(cl, "f")

## Send data to first worker (over slow connection)
clusterExport(cl[1], "x")

## Save to cache file (on remote machine)
cachefile <- clusterEvalQ(cl[1], {
  saveRDS(x, file = (f <- tempfile())); f
})[[1]]

## Load cache file into remaining workers
clusterExport(cl[-1], "cachefile")
clusterEvalQ(cl[-1], { x <- readRDS(file = cachefile); TRUE })

# Resolve function on all workers
y <- clusterEvalQ(cl, f(x))

【讨论】:

  • 当然可以,我希望有一些管道/插座魔术来避免通过硬盘。
【解决方案2】:

这是一个使用 fifos 的版本,我不确定它的可移植性如何,在 Linux 下工作,我不确定这与 @HenrikB 的答案在性能方面如何比较:

library(parallel)

# create a very large cluster on a single (remote) node:
cl <- makePSOCKcluster(3)

# create a very large object
o <- 1:10

# create a fifo on the node and retrieve the name
fifo_name <- clusterEvalQ(cl[1], {
                        fifo_name <- tempfile()
                        system2("mkfifo", fifo_name)
                        fifo_name
})[[1]]

# send the very large object to one process on the node and the name of the fifo to all nodes
clusterExport(cl[1], "o")
clusterExport(cl, "fifo_name")

# does the actual sharing through the fifo
# note that a fifo has to be opened for reading 
# before writing on it
for(i in 2:length(cl)) {
  clusterEvalQ(cl[i], { ff <- fifo(fifo_name, "rb")  })
  clusterEvalQ(cl[1], { ff <- fifo(fifo_name, "wb")
                        saveRDS(o, ff)
                        close(ff)                    })
  clusterEvalQ(cl[i], { o <- readRDS(ff)
                        close(ff)                    })
}

# cleanup
clusterEvalQ(cl[1], {   unlink(fifo_name)            })

# check if everything is there
clusterEvalQ(cl, exists("o"))

# now you can do the actual work
...

【讨论】:

  • data.table 失败,显然这不适用于任何对象类型。
  • 请扩展您的“这对于 data.table ... 失败”声明。支持/解释这一点很重要,这样其他人就不会得出错误的结论。难道你只需要一个library(data.table)worker? (这在cran.r-project.org/web/packages/future/vignettes/… 中有解释)
猜你喜欢
  • 2012-07-22
  • 2021-10-21
  • 2016-02-05
  • 2017-04-22
  • 1970-01-01
  • 2019-02-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多