【问题标题】:Using clustermq R package as a parallel backend for foreach使用 clustermq R 包作为 foreach 的并行后端
【发布时间】:2020-06-01 14:04:33
【问题描述】:

我已经开始使用 clustermq 包作为 drake 管道的并行后端,并且对我观察到的性能改进印象深刻。我有兴趣评估在 drake 之外的设置中使用 clustermq / rzmq,但似乎无法使用 User Guide 中列出的 foreach 示例(在标题为“作为并行 foreach 后端”的小节中)工作。我在这里错过了什么?

在下面的示例中,在我的 4 核机器上,我希望以下代码在接近 5 秒内运行,但它在接近 20 秒内运行。当我使用类似的代码来运行一些繁重的处理时,我只观察到一个核心在做重要的工作。

library(foreach)
(n_cores <- parallel::detectCores())
#> [1] 4
clustermq::register_dopar_cmq(n_jobs = n_cores)
system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
#> Submitting 4 worker jobs (ID: 6856) ...
#>    user  system elapsed 
#>   0.118   0.022  20.187

【问题讨论】:

    标签: r parallel-processing zeromq parallel-foreach


    【解决方案1】:

    clustermq 包的作者友好地回复了我发布的关于此问题的 GitHub 问题。简而言之,有一个选项clustermq.scheduler 可以设置来指定clustermq 采用的调度类型。就我而言,由于未设置该选项,因此 clustermq 默认为本地(即顺序)调度。要在本地计算机上执行并行处理,可以将clustermq.scheduler 选项设置为"multicore"。所以总的来说,这会导致以下结果。

    library(foreach)
    (n_cores <- parallel::detectCores())
    #> [1] 4
    clustermq::register_dopar_cmq(n_jobs = n_cores)
    options(clustermq.scheduler = "multicore")
    system.time(foreach(i = seq_len(n_cores)) %dopar% Sys.sleep(5))
    #> Submitting 4 worker jobs (ID: 7206) ...
    #> Running 4 calculations (1 objs/0 Mb common; 1 calls/chunk) ...
    #> Master: [5.6s 3.6% CPU]; Worker: [avg 1.3% CPU, max 2475916.0 Mb]
    #>    user  system elapsed 
    #>   0.188   0.065   5.693
    

    【讨论】:

      【解决方案2】:

      好消息是,
      R-console 报告工作单元已发送到 4 worker jobs (ID: 6856) ...

      要查看 CPU 使用率的计算方式(如 unix.time() 中的),请对此进行测试(在线 R 代码执行无法加载 library(foreach)library(doParallel) 以实时显示差异,在线,但code 应该清楚如何在正确配置的 localhost R 系统上执行此操作:

      factor = function( x ) {
               if (      x < 3 ) return( x )
               else              return( x * factor( x - 1 ) )
      }
      
      load   = function( n ) {
               for ( l in 1:n ) {
                     base <- factor( 1078 )
                     for ( k in 2:1111 ){
                           base <- ( base + factor( k ) ) / factor( k )
                     }
               }
      }
      
      op <- options( digits.secs = 6 )
      
      Sys.time()
      system.time(                                           load( 100 ) )
      Sys.time()
      system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 100 ) )
      
      Sys.time()
      
      # ___________________________________________________________________
      # tio.run
      # ___________________________________________________________________
      #
      # factor(   78 )                                                     user: 0.016 system: 0.001 elapsed: 0.018
      # factor(  178 )                                                     user: 0.016 system: 0.002 elapsed: 0.018 Real: 0.271 User: 0.180 Sys.: 0.059 CPU share: 88.04 %
      # factor( 1078 )                                                     user: 0.017 system: 0.005 elapsed: 0.021 Real: 0.236 User: 0.188 Sys.: 0.047 CPU share: 99.94 %
      # factor( 2178 )                                  Timing stopped at: user: 0.017 system: 0.005 elapsed: 0.023 Error: C stack usage  7972624 is too close to the limit : Execution halted
      #
      # load(   4 )                                                        user: 2.230 system: 0.005 elapsed: 2.253 Real: 2.483 User: 2.398 Sys.: 0.048 CPU share: 98.25 %
      # load(   8 )                                                        user: 4.435 system: 0.013 elapsed: 4.481 Real: 4.753 User: 4.614 Sys.: 0.067 CPU share: 98.49 %
      # load(  16 )                                                        user: 8.620 system: 0.009 elapsed: 8.693 Real: 8.932 User: 8.775 Sys.: 0.059 CPU share: 98.91 %
      # load(  32 )                                                        user:20.522 system: 0.018 elapsed:20.892 Real:21.167 User:20.699 Sys.: 0.077 CPU share: 98.15 %
      # load(  64 )                                                        user:35.212 system: 0.007 elapsed:35.611 Real:35.861 User:35.374 Sys.: 0.053 CPU share: 98.78 %
      # load( 100 )                                                        user:54.721 system: 0.009 elapsed:55.128 Real:55.377 User:54.875 Sys.: 0.065 CPU share: 99.21 %
      # load( 104 )                                                        user:57.087 system: 0.012 elapsed:57.532 Real:57.743 User:57.245 Sys.: 0.060 CPU share: 99.24 %
      #
      # ___________________________________________________________________
      # https://www.jdoodle.com/execute-r-online/
      # ___________________________________________________________________
      #
      # factor(   78 ) CPU Time: 0.27 sec(s), Mem: 52532 kB(s)             user: 0.028 system: 0.000 elapsed: 0.030
      # factor(  178 ) CPU Time: 0.28 sec(s), Mem: 53212 kB(s)             user: 0.024 system: 0.004 elapsed: 0.028
      # factor( 1078 ) CPU Time: 0.27 sec(s), Mem: 56884 kB(s)             user: 0.024 system: 0.008 elapsed: 0.037
      # factor( 2178 ) CPU Time: 0.28 sec(s), Mem: 60768 kB(s) stopped at: user: 0.028 system: 0.004 elapsed: 0.032 Error: C stack usage  7971588 is too close to the limit : Execution halted
      #
      #
      # load(  2 ) executed in  4.025 sec(s)                               user: 2.584 system: 0.008 elapsed: 2.630
      # load(  4 ) executed in  6.005 sec(s)                               user: 4.704 system: 0.016 elapsed: 4.791
      # load(  8 ) executed in 11.036 sec(s)                               user: 9.372 system: 0.028 elapsed: 9.572
      # load( 16 ) executed in 20.085 sec(s)                               user:18.388 system: 0.068 elapsed:18.789
      # load( 32 ) executed in 39.093 sec(s)                               user:37.664 system: 0.060 elapsed:37.974
      
      
      # library(foreach)
      # (n_cores <- parallel::detectCores())
      ## > [1] 4
      # clustermq::register_dopar_cmq( n_jobs = n_cores ) ############################ REGISTER a parallel-backend
      # system.time( foreach( i = seq_len( n_cores ) ) %dopar% load( 200 ) )
      ## > Submitting 4 worker jobs (ID: 6856) ...
      ## >    user  system elapsed 
      ## >   0.118   0.022  20.187
      

      【讨论】:

      • 感谢您的回复。因此,如果我正确理解这一点,则构造 load 函数以便在执行计算繁重的任务时观察 foreach + %dopar% 构造。奇怪的是,每当我运行foreach( i = seq_len( n_cores ) ) %dopar% load( 200 ) 行时,我都会收到错误Error: evaluation nested too deeply: infinite recursion / options(expressions=)?
      猜你喜欢
      • 2015-09-28
      • 1970-01-01
      • 1970-01-01
      • 2015-08-27
      • 1970-01-01
      • 1970-01-01
      • 2021-09-27
      • 2014-08-22
      • 2016-07-11
      相关资源
      最近更新 更多