【问题标题】:Nested parallel processing with conditional logic error带有条件逻辑错误的嵌套并行处理
【发布时间】:2019-04-15 10:48:53
【问题描述】:

这个有点复杂,所以我认为不值得分享我正在使用的确切代码,但我应该能够使用伪代码很好地理解这一点:

一点背景: 本质上,我试图在嵌套的操作循环上进行并行计算。 我有两个大函数,第一个函数需要运行并返回 TRUE 才能运行第二个函数,如果第二个函数运行,它需要循环多次迭代。 现在这是一个嵌套循环,因为我需要针对各种场景多次运行上述整个操作。 我尝试使用的伪代码如下:

Output <- foreach(1 to “I”, .packages=packages, .combine=rbind) %:%  
    Run the first function  
    If the first function is false:  
        Print and record  
    Else:  
        Foreach(1 to J, .packages=packages, .combine=rbind) %dopar%{  
            Run the second function  
            Create df summarizing each loop of second function  
        }  

这是我正在尝试做的事情的简化版本以及我遇到的错误:

library(doParallel)
library(foreach)
func1 <- function(int1){
  results <- list(int1,TRUE)
  return(results)
}
func2 <- function(int2){
  return(int1/int2)
}

int1list <- seq(1,10)
int2list <- seq(1,15)

out <- foreach(i=1:length(int1list),.combine=rbind) %:%
  out1 <- func1(i)
  if(out1[[2]]==FALSE){
    print("fail")
    next
  } else{
    foreach(j=1:length(int2),.combine=rbind) %dopar% {
      int3 <- func2(j)
      data.frame("Scenario"=i,"Result"=int3)
    }
  }

错误:func1(i) 中的错误:找不到对象“i”

当我运行上面的代码时,它基本上告诉我它甚至找不到对象“I”,我认为这是因为我在最内层循环之外运行了调用“I”的东西。我之前已经能够让嵌套的并行循环工作,但是我没有任何需要在最内层循环之外运行的东西,所以我假设这是一个包不知道执行顺序的问题。

我有一个解决方法,我可以并行运行第一个函数,然后根据第一个循环的结果并行运行第二个函数(本质上是两个单独的循环而不是嵌套循环),但我想知道是否有一种方法可以让嵌套循环之类的东西工作,因为我认为它会更有效。在生产环境中运行时,此代码可能需要数小时才能运行,因此节省一些时间是值得的。

【问题讨论】:

  • conditional logic *ERROR* 部分出现在哪里?此外,已经并行化的代码中的并行化很可能最终会减慢整个代码的速度(由于拆分和合并操作变得非常昂贵)。
  • 当第一个函数运行时,错误开始起作用,因为“i”是函数调用的一部分。
  • 伪代码可能不够用,没有 R 代码时很难解决 R 错误。我怀疑这个伪代码很大程度上基于实际代码,所以我建议:提出两个 trivial 1-2 行函数(代替更复杂的函数)和一个可重现的问题,包括 I将来自。如果这是基于对某种大型数据集进行子集化,那么提供样本(结构相似或来自实际数据的样本)也可能会有所帮助。
  • 您会推荐哪种算法最有效的方式来执行上述操作?第一个函数需要运行并成功才能运行第二个函数,内部循环可能需要循环大约 15-25 次。外部循环可能会循环 10 到 500 次。
  • 这是一个很好的 r2evans 推荐,我会编辑更新。

标签: r parallel-processing


【解决方案1】:

我不是foreach 的专业人士,但有几件事很突出:

  • func2 引用了 int1int2 但它只给出了后者;这可能是您简化示例的产物,也许不是?
  • 你的代码需要用花括号括起来,即你需要从

    out <- foreach(i=1:length(int1list),.combine=rbind) %:%
      out1 <- func1(i)
      if(out1[[2]]==FALSE) ...
    

    out <- foreach(i=1:length(int1list),.combine=rbind) %:% {
      out1 <- func1(i)
      if(out1[[2]]==FALSE) ...
    }
    
  • foreach 的文档建议二元运算符 %:% 是在两个 foreach 调用之间使用的嵌套运算符,但您没有这样做。我想我可以让它与%do%(或%dopar%)一起正常工作
  • 我不认为prints 在并行foreach 循环中工作得很好......它可能在主节点上工作,但在其他所有节点上都没有,参考:How can I print when using %dopar%
  • 可能再次由于简化示例,您定义但实际上并未使用 int1list 的内容(只是它的长度),我将在此示例中进行补救
  • next 在“正常” R 循环中工作,而不是在这些专门的 foreach 循环中工作;不过,这不是问题,因为您的 if/else 结构提供了相同的效果

这是您的示例,稍作修改以考虑上述所有内容。我加UsedJ表示

library(doParallel)
library(foreach)
func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)
out <- foreach(i=1:length(int1list),.combine=rbind) %do% {
  out1 <- func1(int1list[i])
  if(!out1[[2]]){
    data.frame("Scenario"=i, "Result"=out1[[1]], UsedJ=FALSE)
    # next
  } else{
    foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      int3 <- func2(out1[[1]], int2list[j])
      data.frame("Scenario"=i,"Result"=int3, UsedJ=TRUE)
    }
  }
}
out
#   Scenario Result UsedJ
# 1        1   1.00 FALSE
# 2        2   2.00 FALSE
# 3        3   3.00  TRUE
# 4        3   1.50  TRUE
# 5        3   1.00  TRUE
# 6        3   0.75  TRUE
# 7        3   0.60  TRUE

编辑

如果您没有看到并行化,可能是因为您还没有设置“集群”。基于foreach 使用%:% 运算符嵌套循环的方法,还对工作流程进行了一些其他更改以使其能够很好地并行化。

为了“证明”这是并行工作的,我添加了一些基于How can I print when using %dopar% 的日志记录(因为并行进程并不像人们希望的那样print)。

library(doParallel)
library(foreach)
Log <- function(text, ..., .port = 4000, .sock = make.socket(port=.port)) {
  msg <- sprintf(paste0(as.character(Sys.time()), ": ", text, "\n"), ...)
  write.socket(.sock, msg)
  close.socket(.sock)
}
func1 <- function(int1) {
  Log(paste("func1", int1))
  Sys.sleep(5)
  results <- list(int1, int1 > 2)
  return(results)
}
func2 <- function(int1, int2) {
  Log(paste("func2", int1, int2))
  Sys.sleep(1)
  return(int1 / int2)
}

日志代码的使用需要从该套接字读取的外部方式。我在这里使用带有ncat -k -l 4000 的netcat(nc 或Nmap 的ncat)。这项工作当然不需要工作,但在这里可以方便地查看事情的进展情况。 (注意:此侦听器/服务器需要在您尝试使用Log 之前运行。)

我无法让嵌套的“foreach -> func1 -> foreach -> func2”正确并行化 func2。根据睡眠,对func1 的三个调用应该需要 5 秒,对func2 的五个调用需要 2 秒(每批三个),但需要 10 秒(对@987654358 的三个并行调用@,然后连续调用五次func2):

system.time(
  out <- foreach(i=1:length(int1list), .combine=rbind, .packages="foreach") %dopar% {
    out1 <- func1(int1list[i])
    if (!out1[[2]]) {
      data.frame(Scenario=i, Result=out1[[1]], UsedJ=FALSE)
    } else {
      foreach(j=1:length(int2list), .combine=rbind) %dopar% {
        int3 <- func2(out1[[1]], int2list[j])
        data.frame(Scenario=i, Result=int3, UsedJ=TRUE)
      }
    }
  }
)
#    user  system elapsed 
#    0.02    0.00   10.09 

使用相应的控制台输出:

2018-11-12 11:51:17: func1 2
2018-11-12 11:51:17: func1 1
2018-11-12 11:51:17: func1 3
2018-11-12 11:51:23: func2 3 1
2018-11-12 11:51:24: func2 3 2
2018-11-12 11:51:25: func2 3 3
2018-11-12 11:51:26: func2 3 4
2018-11-12 11:51:27: func2 3 5

(注意顺序不保证。)

所以我们可以先把它分解成计算 func1 的东西:

system.time(
  out1 <- foreach(i = seq_along(int1list)) %dopar% {
    func1(int1list[i])
  }
)
#    user  system elapsed 
#    0.02    0.01    5.03 
str(out1)
# List of 3
#  $ :List of 2
#   ..$ : int 1
#   ..$ : logi FALSE
#  $ :List of 2
#   ..$ : int 2
#   ..$ : logi FALSE
#  $ :List of 2
#   ..$ : int 3
#   ..$ : logi TRUE

控制台:

2018-11-12 11:53:21: func1 2
2018-11-12 11:53:21: func1 1
2018-11-12 11:53:21: func1 3

然后处理func2 的东西:

system.time(
  out2 <- foreach(i = seq_along(int1list), .combine="rbind") %:%
    foreach(j = seq_along(int2list), .combine="rbind") %dopar% {
      Log(paste("preparing", i, j))
      if (out1[[i]][[2]]) {
        int3 <- func2(out1[[i]][[1]], j)
        data.frame(i=i, j=j, Result=int3, UsedJ=FALSE)
      } else if (j == 1L) {
        data.frame(i=i, j=NA_integer_, Result=out1[[i]][[1]], UsedJ=FALSE)
      }
    }
)
#    user  system elapsed 
#    0.03    0.00    2.05 
out2
#   i  j Result UsedJ
# 1 1 NA   1.00 FALSE
# 2 2 NA   2.00 FALSE
# 3 3  1   3.00 FALSE
# 4 3  2   1.50 FALSE
# 5 3  3   1.00 FALSE
# 6 3  4   0.75 FALSE
# 7 3  5   0.60 FALSE

两秒(第一批三个是1秒,第二批两个是1秒)是我所期望的。控制台:

2018-11-12 11:54:01: preparing 1 2
2018-11-12 11:54:01: preparing 1 3
2018-11-12 11:54:01: preparing 1 1
2018-11-12 11:54:01: preparing 1 4
2018-11-12 11:54:01: preparing 1 5
2018-11-12 11:54:01: preparing 2 1
2018-11-12 11:54:01: preparing 2 2
2018-11-12 11:54:01: preparing 2 3
2018-11-12 11:54:01: preparing 2 4
2018-11-12 11:54:01: preparing 2 5
2018-11-12 11:54:01: preparing 3 1
2018-11-12 11:54:01: preparing 3 2
2018-11-12 11:54:01: func2 3 1
2018-11-12 11:54:01: preparing 3 3
2018-11-12 11:54:01: func2 3 2
2018-11-12 11:54:01: func2 3 3
2018-11-12 11:54:02: preparing 3 4
2018-11-12 11:54:02: preparing 3 5
2018-11-12 11:54:02: func2 3 4
2018-11-12 11:54:02: func2 3 5

您可以看到func2 被正确调用了五次。不幸的是,您看到循环内部有很多“旋转”。当然,它实际上是一个无操作(如 2.05 秒运行时所证明的那样),因此节点上的负载可以忽略不计。

如果有人有办法避免这种不必要的旋转,我欢迎 cmets 或“竞争”的答案。

【讨论】:

  • 是的 - 为草率的样本道歉,你所有的假设都是正确的。您的代码有效并解决了我遇到的问题,但是在调整并运行它后,它并没有按照我的预期并行化工作。我打算让它尝试利用我服务器上的所有 8 个处理器来完成工作,但似乎这种嵌套策略导致算法执行 func1 后跟 func2 重复次数,然后再进入第二次迭代我(现在我想想这很有意义)。为此,执行 2 个单独的循环应该更有效。
  • 查看我的编辑......它很冗长,因为所有代码都不需要你,但我认为很清楚你可以丢弃什么以及你可能想要适应自己的代码。
  • 我无法让您的代码正常运行。我添加了一个带有 4 个内核的并行化来进行测试(cl
  • 我正在开发一个类似于您发布的第二个版本的版本(因为我同意,第一个版本将完全按照它正在做的事情,这并不理想)。但是,我只对 func2 使用了一个循环,因为需要根据 func1 的结果提取一些数据才能使 func2 工作。我正在尝试确定这是否可以在嵌套并行循环中完成,或者我是否应该在顺序 func1 循环中坚持 func2 的当前单个并行循环。
  • 对于您的错误,我应该在这里提到(并且在提供的链接中提到)需要启动 netcat 侦听器(例如,ncat -k -l 4000首先 .如果这是唯一的问题,那么生产中不需要任何代码,只是为了解释性能和并行性。
【解决方案2】:

我感谢 r2evans 提供的帮助,虽然由于我缺乏经验并且无法弄清楚如何让 ncat 在我的计算机上工作,我实际上无法复制他的工作,但他帮助我意识到我原来的方法不会t 工作以及拆分为两个单独的 foreach 并行循环,此时我已经得到了一个工作生产版本。

这是最初提出的解决方案:

library(doParallel)
library(foreach)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)
out <- foreach(i=1:length(int1list),.combine=rbind) %do% {
  out1 <- func1(int1list[i])
  if(!out1[[2]]){
    data.frame("Scenario"=i, "Result"=out1[[1]], UsedJ=FALSE)
    # next
  } else{
    foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      int3 <- func2(out1[[1]], int2list[j])
      data.frame("Scenario"=i,"Result"=int3, UsedJ=TRUE)
    }
  }
}

stopCluster(cl)
registerDoSEQ()

out

但是,这会导致循环等待 func1 的 func2 迭代的第一次迭代完成,然后再开始第二次迭代和 func1 的迭代。我选择将其分成两个单独的循环,如下所示:

library(doParallel)
library(foreach)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)

out1 <- foreach(i=1:length(int1list)) %dopar%{
  func1(i)
}

finalOut <- data.frame("Scenario"=integer(),"UsedJ"=logical(),"Result"=double())

for (i in 1:length(int1list)){
  if(out1[[2]]==FALSE){
    tempOut <- data.frame("Scenario"=i,"UsedJ"=FALSE,"Result"=NA)
  } else{
    tempOutput <- foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      Result <- func2(i,j)
      data.frame("Scenario"=i,"UsedJ"=TRUE,"Result"=Result)
    }
  }
}

stopCluster(cl)
registerDoSEQ()

finalOut

这个算法似乎很符合我的目的。它的效率并不高,但它应该可以完成工作并且不会太浪费。

【讨论】:

  • 为了清楚起见...ncat(或nc)需要在终端中运行,而不是在R中...如果这很明显,我很抱歉,但新用户可能没有基于我模糊的描述的飞跃。无论如何,它的使用只是为了提供函数入口的指示,并行化策略绝不需要起作用。
  • 我试图在终端中运行它,但我要么在安装附带的终端中运行它(它立即关闭),要么从一个干净的终端运行它,它无法识别命令.我是否需要将其添加到系统路径中?我没有在我的工作计算机上添加程序到系统路径的管理权限(这很烦人)。
  • 不需要管理员权限,但我理解这种挫败感。我不明白你的意思"run it in the terminal that come with the installation" ... 安装netcat?它不包括终端。 “清洁终端”?如果在 Windows 上,然后“开始 > 运行 > cmd”,转到正确的目录,输入 ncat -k -l 4000,它应该“什么也不做”并且不返回。如果在 linux 上,打开一个 xterm,输入path/to/ncat -k -l 4000,应该“什么都不做”(并且不返回)。无论如何,并行化技术(没有Log)是否按预期工作。我们可以停止这个线程,不需要强制netcat。 :-)
猜你喜欢
  • 2023-04-01
  • 1970-01-01
  • 2019-03-03
  • 2014-04-09
  • 1970-01-01
  • 2015-10-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多