【问题标题】:Import CSV by chunk simultaneously with Parallel and fread in r在 r 中使用 Parallel 和 fread 同时按块导入 CSV
【发布时间】:2018-02-08 00:48:48
【问题描述】:

我知道如何打开连接并使用 read.table [编辑:fread does not allow connections] 读取数据块,删除一些行并在列表中按顺序收集结果数据。但是有没有其他方法可以对其进行优化,以便可以在 fread 中读取块并同时进行处理?

我正在使用 Windows。

到目前为止,从我在网上收集的信息来看,我可以使用 Cygwin -split- 将我拥有的大型 csv 文件拆分为多个较小的 csv 文件,然后使用 parLapply 来读取所有文件。

你们有更好的主意吗?

【问题讨论】:

    标签: r data.table


    【解决方案1】:

    这是一种将 fread 调用并行化到数据块的尝试。该解决方案大量提取了来自

    的元素

    TryCatch with parLapply (Parallel package) in R

    import large number of .txt files to data.frame, include empty .txt files by giving them a row data.frame

    require(data.table)
    require(dplyr)
    require(parallel)
    
    gc()
    
    #=========================================================================
    # generating test data
    #=========================================================================
    
    set.seed(1)
    m   <- matrix(rnorm(1e5),ncol=2)
    csv <- data.frame(x=1:1e2,m)
    names(csv) <- c(letters[1:3])
    head(csv)
    write.csv(csv,"test.csv")
    
    #=========================================================================
    # defining function to read chunks of data with fread: fread_by_chunks
    #=========================================================================
    
    fread_by_chunks <-  function(filepath, counter, ChunkSize, ...) {
    
        chunk <- as.character({(counter-1)/ChunkSize}+1)   
        print(paste0("Working on chunk ", chunk, "..."))
    
        DT <- tryCatch(fread(filepath, 
                             skip=counter, 
                             nrows=ChunkSize, 
                             ...), 
                       error=function(e) message(conditionMessage(e)))
    
        # This condition checks that no errors occured
        if(!class(DT)[1]=="data.table"){ 
          DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
        # Just in case files are still empty even though no error  
        } else if(nrow(DT)==0){ 
          DT <- data.table(cbind(chunk=chunk,is.empty="YES"))
        # Apply filter here using column indexes DT[DT[[ncol]]] as columns are not named, automatic names (Vs) do not work.
        } else {
          DT[,chunk := chunk]
          DT[,is.empty := "NO"]
        }
        return(DT)
      }
    
    #=========================================================================
    # testing fread_by_chunks
    #=========================================================================
    
    ChunkSize = 1000
    n_rows = 60000 # test.csv has 50e3 lines, we want to test if the code breaks with a call to nrows above that. 
    ## In this step you have to make a guess as to how many rows there are in the dataset you are reading in. Guess a large number to make sure all the lines will be read. When the number of rows in your guess is above the actual number, the code will return a row with the field is.empty == "YES". You just have to delete these rows afterwards. If no such rows are there you cannot be sure you have read all the rows from the csv file. 
    
    counter <- c(0, seq(ChunkSize, n_rows, ChunkSize)) + 1
    
    start_time <- Sys.time()
    test <- lapply(counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = ChunkSize, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
    Sys.time() - start_time
    ##Time difference of 0.2528741 secs
    
    # binding chunks
    test <- bind_rows(test)
    
    #=========================================================================
    # parallelizing fread_by_chunks
    #=========================================================================
    
    no_cores <- detectCores() - 1 # 3 cores, 2.8 Ghz
    cl <- makeCluster(no_cores)
    clusterExport(cl, c("data.table", "ChunkSize", "counter", "fread_by_chunks", "n_rows"))
    clusterEvalQ(cl, library(data.table))
    
    start_time <- Sys.time()
    test <- parLapply(cl, counter, function(x) {fread_by_chunks(filepath = "test.csv", counter = x, ChunkSize = 1000, header = F, fill = T, blank.lines.skip=T, select=c(1,2,4))})
    Sys.time() - start_time
    ##Time difference of 0.162251 secs
    stopCluster(cl)
    
    test <- bind_rows(test)  
    
    # just calling fread without blocks. It obviously takes a lot less time, but we have memory to fit all the data.
    
    start_time <- Sys.time()
    test <- fread("test.csv", 
                  skip=0, 
                  nrows=ChunkSize, 
                  header=F, 
                  fill = T, 
                  blank.lines.skip=T, 
                  select=c(1,2,4))
    Sys.time() - start_time
    #Time difference of 0.006005049 secs
    

    【讨论】:

      【解决方案2】:

      我喜欢你的解决方案和你的计时测试,但我希望我能更清楚地理解这个问题。问题是您没有足够的内存来读取整个文件,还是您想通过并行化更快地读取和处理数据?

      如果问题是文件大小 > 内存,但可能只适合内存中所需的行和列,那么我建议使用 awk 制作一个较小的 csv,只包含所需的行和列,然后 fread那。 awk 逐行处理,因此内存不会成为问题。这是跳过空行并将第 1、2 和 4 列输出到 small.csv 的示例 awk 代码。

      awk -F',' 'BEGIN{OFS=","}{if($1!="")print $1,$2,$4}' big.csv > smaller.csv
      

      如果问题是速度,我的猜测是最快的选择是 fread 文件一次,然后使用例如 parLapply 或更简单的 mclapply 进行并行处理。

      【讨论】:

        猜你喜欢
        • 2019-10-31
        • 2014-05-03
        • 1970-01-01
        • 2021-04-15
        • 1970-01-01
        • 2016-06-09
        • 2013-09-06
        • 2016-10-23
        • 1970-01-01
        相关资源
        最近更新 更多