【问题标题】:Parallel computing, which alternative to tidyr::complete in dplyr?并行计算,在 dplyr 中哪个替代 tidyr::complete?
【发布时间】:2020-06-24 11:17:46
【问题描述】:

我正在尝试并行化管道。 在管道中有一个 tidyr 命令(“tidyr::complete”)。一旦并行运行,这会破坏代码,因为无法识别对象类。

在 dplyr 中是否有替代方法来完成?

library(dplyr)
library(tidyr)
library(zoo)


test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
               var_1=c(1,1,1,1,1,1,2,2,2), 
               var_2=c(1,1,1,1,1,2,3,3,3), 
               var_3=c(0,5,NA,15,20,NA,1,NA,NA))

max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)

连续剧


test_serial <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1, year = seq(min_year,max_year)) %>%
  mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))


并行(失败)

devtools::install_github("hadley/multidplyr")
library(multidplyr)

cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))

test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>% 
  dplyr::group_by(var_1,var_2) %>% 
  tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
  dplyr::mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>% 
  collect()

这是错误信息

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"

【问题讨论】:

    标签: r dplyr parallel-processing multidplyr


    【解决方案1】:

    Multidplyr 允许您:

    1. 使用partition()拆分数据
    2. 在专用节点上处理每个分区
    3. collect()结果

    所有数据处理任务都不适用于之前的工作流程。

    特别是,complete 需要知道输入数据中所有可能的值才能创建缺失的行,这意味着该操作作为一个整体不能拆分,所以没有适用的方法。

    在您提供的示例中,每个节点都会收到一个 var_1, var_2 对,而不知道其他节点得到了什么,这不允许并行实现预期结果。

    但是,正如您已经知道 year = seq(min_year,max_year) 一样,您可以仅为此变量并行化 complete 任务,按 var_1 拆分任务,例如使用 furrr 包:

    library(furrr)
    plan(multiprocess)
    test_parallel <- test %>% 
      group_by(var_1,var_2) %>% 
      complete(var_1) %>% split(.$var_1) %>% 
      furrr::future_map(~{
        complete(.x, year = seq(min_year,max_year)) %>%
        dplyr::mutate(
            var_3 = na.approx(var_3,na.rm = FALSE),
            var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) 
        }) %>% bind_rows()
    
    > identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
    +           c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
    [1] TRUE
    

    将在更大的数据集上进行测试,以衡量 performance 的潜在改进。

    【讨论】:

    • 但是,我是否能够为每个节点分配一个完整的组来完成(比如 var_1),那么我可以使用完成吗?
    • 我收到以下警告:#Warning 消息:[ONE-TIME WARNING] 从 RStudio 运行 R 时,将来会禁用分叉处理('multicore')(>= 1.13.0),因为它被认为是不稳定的。因此,plan("multicore") 将回退到 plan("sequential"),而 plan("multiprocess") 将回退到 plan("multisession") - 不像过去那样 plan("multicore") .有关更多详细信息,如何控制分叉处理,以及如何在未来的 R 会话中消除此警告,请参阅 ?future::supportsMulticore
    • 作为证据,system.time() 显示 furrr 解决方案比标准解决方案慢 4 倍
    • 多任务处理在小型示例数据集上的速度通常较慢,因为您需要打开任务,将数据传输给它们并收集结果,这对于 15 行来说比处理数据需要更多时间直接看我最后一句话;-)
    猜你喜欢
    • 1970-01-01
    • 2016-06-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-27
    • 2016-01-03
    • 2017-09-14
    相关资源
    最近更新 更多