【问题标题】:Find nearest matches for each row and sum based on a condition为每一行查找最接近的匹配项并根据条件求和
【发布时间】:2018-03-01 04:23:16
【问题描述】:

考虑以下 data.table 事件:

library(data.table)
breaks <- data.table(id = 1:8,
                     Channel = c("NP1", "NP1", "NP2", "NP2", "NP3", "NP3", "AT4", "AT4"),
                     Time = c(1000, 1100, 975, 1075, 1010, 1080, 1000, 1050),
                     Day = c(1, 1, 1, 1, 1, 1, 1, 1),
                     ZA = c(15, 12, 4, 2, 1, 2, 23, 18),
                     stringsAsFactors = F)

breaks
   id Channel Time Day ZA
1:  1     NP1 1000   1 15
2:  2     NP1 1100   1 12
3:  3     NP2  975   1  4
4:  4     NP2 1075   1  2
5:  5     NP3 1010   1  1
6:  6     NP3 1080   1  2
7:  7     AT4 1000   1 23
8:  8     AT4 1050   1 18

对于中断中的每个唯一事件,我想使用 Time 变量(其中 Day == Day)在所有其他频道中找到最近的事件,然后对这些事件的 ZA 值求和。

这是我想要达到的结果:

   id Channel Time Day ZA Sum
1:  1     NP1 1000   1 15  28
2:  2     NP1 1100   1 12  22
3:  3     NP2  975   1  4  39
4:  4     NP2 1075   1  2  32
5:  5     NP3 1010   1  1  42
6:  6     NP3 1080   1  2  32
7:  7     AT4 1000   1 23  20
8:  8     AT4 1050   1 18  19

所以对于第一行,通道是 NP1。 Time = 1000 的所有其他频道中的关闭事件是第 3、5 和 7 行。4+1+23 = 28

我通过以下代码使用 data.table 使其工作:

breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day", allow.cartesian = TRUE][
  Channel != i.Channel][
    order(id)][
      , delta := abs(Time - i.Time)][
        , .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
          , unique(.SD, by = c("id", "i.Channel"))][
            , .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)]

但是,这会在第一步中创建一个包含 64 行的数据集,我想使用超过一百万行的数据集来执行此操作。

谁能帮我找到更有效的方法?

编辑:

我在包含 39 个不同通道的 140 万行的完整数据集上尝试了 G. Grothendieck (sqldf)、eddi (data.table) 和 MarkusN (dplyr) 的解决方案。数据集在内存中。

sqldf:      54 minutes
data.table: 11 hours
dplyr:      29 hours

【问题讨论】:

    标签: r data.table sqldf


    【解决方案1】:

    偶然发现并在 OP 编辑​​中看到了时间安排。因此,提出一种可能的Rcpp 方法:

    library(Rcpp)
    #library(inline)
    nearsum <- cppFunction('
    NumericVector closestSum(NumericVector cid, NumericVector Time, NumericVector ZA) {
        int d, mintime, mintimeZA, prevChannel = 0, nextChannel = 0;
        int sz = cid.size();
        NumericVector sumvec(sz);
    
        for (int r = 0; r < sz; r++) {
            sumvec[r] = 0;
            mintime = 10000;
            //Rcpp::Rcout << "Beginning row = " << r << std::endl;
    
            for (int i = 0; i < sz; i++) {
                if (cid[r] != cid[i]) {
                    //Rcpp::Rcout << "Current idx = " << i << std::endl;
    
                    //handle boundary conditions
                    if (i == 0) {
                        prevChannel = 0;    
                    } else {
                        prevChannel = cid[i-1];
                    }
    
                    if (i == sz - 1) {
                        nextChannel = 0;    
                    } else {
                        nextChannel = cid[i+1];
                    }
    
                    //calculate time difference
                    d = abs(Time[i] - Time[r]);
    
                    if (cid[i] != prevChannel) {
                        ///this is a new channel
                        mintime = d;
                        mintimeZA = ZA[i];
                    } else {
                        if (d < mintime) {
                            //this is a new min in time diff
                            mintime = d;
                            mintimeZA = ZA[i];
                        }
                    }
    
                    //Rcpp::Rcout << "Time difference = " << d << std::endl;
                    //Rcpp::Rcout << "ZA for current min time gap = " << mintimeZA << std::endl;
    
                    if (cid[i] != nextChannel) {
                        //this is the last data point for this channel
                        mintime = 10000;
                        sumvec[r] += mintimeZA;
                        //Rcpp::Rcout << "Final sum for current row = " << sumvec[r] << std::endl;
                    }
                }
            }
        }
        return sumvec;
    }
    ')
    

    调用cpp函数:

    library(data.table)
    setorder(breaks, id, Channel, Day, Time)
    breaks[, ChannelID := .GRP, by=Channel]
    breaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
    

    输出:

       id Channel Time Day ZA ChannelID Sum
    1:  1     NP1 1000   1 15         1  28
    2:  2     NP1 1100   1 12         1  22
    3:  3     NP2  975   1  4         2  39
    4:  4     NP2 1075   1  2         2  32
    5:  5     NP3 1010   1  1         3  42
    6:  6     NP3 1080   1  2         3  32
    7:  7     AT4 1000   1 23         4  20
    8:  8     AT4 1050   1 18         4  19
    

    计时码:

    #create a larger dataset
    largeBreaks <- rbindlist(lapply(1:1e5, function(n) copy(breaks)[, Day := n]))
    setorder(largeBreaks, Day, Channel, Time)
    largeBreaks[, id := .I]
    
    library(sqldf)
    mtd0 <- function() {
        sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
         from (
           select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
           from largeBreaks a join largeBreaks b on a.Day = b.Day and a.Channel != b.Channel
           group by a.id, b.Channel)
         group by id")
    }
    
    mtd1 <- function() {
        setorder(largeBreaks, Day, Channel, Time)
        largeBreaks[, ChannelID := .GRP, by=Channel]
        largeBreaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
    }
    
    library(microbenchmark)
    microbenchmark(mtd0(), mtd1(), times=3L)
    

    时间[需要增加大约 5 秒(至少在我的机器上)来编译 cpp 函数]:

    Unit: milliseconds
       expr        min         lq       mean    median         uq        max neval
     mtd0() 10449.6696 10505.7669 10661.7734 10561.864 10767.8252 10973.7863     3
     mtd1()   365.4157   371.2594   386.6866   377.103   397.3221   417.5412     3
    

    【讨论】:

      【解决方案2】:

      这是使用 dplyr 和自联接的解决方案:

      library(dplyr)
      breaks %>% 
        inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%  # self-join
        filter(Channel != Channel.y) %>%                        # ignore events of same channel
        group_by(id, Channel, Time, Day, ZA, Channel.y) %>%     # build group for every event
        arrange(abs(Time - Time.y)) %>%                         # sort by minimal time-diff
        filter(row_number()==1) %>%                             # keep just row with minimal time-diff
        group_by(id, Channel, Time, Day, ZA) %>%                # group by all columns of original event
        summarise(Sum=sum(ZA.y)) %>%                            # sum ZA of other channels
        ungroup() %>% 
        select(id:Sum)
      

      也许我必须更具体地回答我的问题。与 data.table 不同,dplyr 具有将代码转换为 sql 的能力。因此,如果您的数据存储在数据库中,您可以直接连接到包含数据的表。所有(大部分)dpylr 代码都在您的 DBMS 中进行评估。由于执行连接是每个 DBMS 的关键任务,因此您不必担心性能。

      但是,如果您的数据被导入 R 并且您担心 RAM 限制,则必须遍历数据帧的每一行。这也可以通过 dplyr 来完成:

      library(dplyr)
      breaks %>% 
      rowwise() %>% 
      do({
        row = as_data_frame(.)
        df =
          breaks %>%
          filter(Day == row$Day & Channel != row$Channel) %>% 
          mutate(time_diff = abs(Time-row$Time)) %>% 
          group_by(Channel) %>% 
          arrange(abs(Time-row$Time), .by_group=TRUE) %>% 
          filter(row_number()==1) %>% 
          ungroup() %>% summarise(sum(ZA))
      
        row %>% mutate(sumZA = df[[1]])
      })
      

      【讨论】:

      • 没有解决 OP 的问题 - 你做的和他们做的一模一样。
      • 我认为这是对问题的有效答案,比较多种方法很有用。它确实给出了与问题中的答案相同的答案。
      • @G.Grothendieck 不,绝对不是。 OP 关心的是对一百万行数据的自联接,而这个答案并没有涉及到这一点。
      【解决方案3】:

      我不确定这个速度(可能很慢),但在记忆方面会非常保守:

      Channels = breaks[, unique(Channel)]
      breaks[, Sum := breaks[breaks[row,
                                    .(Day, Channel = setdiff(Channels, Channel), Time)],
                             on = .(Day, Channel, Time), roll = 'nearest',
                             sum(ZA)]
             , by = .(row = 1:nrow(breaks))]
      

      它可能有助于提高速度到setkey(breaks, Day, Channel, Time) 而不是使用on

      【讨论】:

        【解决方案4】:

        在内部选择中,将每一行在同一天和不同频道的中断中自动加入到这些行中,然后在所有加入的行中到特定的原始行,只保留具有最小绝对时间差的加入行。在外部选择中,来自 id 内其他 Channel 的 ZA 给出结果。

        请注意,我们在此处假设默认 SQLite 后端为 sqldf,并使用特定于该数据库的功能,即,如果在选择中使用min,则该选择中指定的其他值也将从中填充最小化行。

        默认情况下,它将使用一个内存数据库,如果它适合,那将是最好的,但如果您指定dbname = tempfile() 作为sqldf 的参数,它将使用文件作为内存数据库。也可以添加一个或多个索引,这可能会或可能不会加速它。更多示例请参见 sqldf github 主页。

        library(sqldf)
        
        sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
         from (
           select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
           from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
           group by a.id, b.Channel)
         group by id")
        

        给予:

          id Channel Time Day ZA Sum
        1  1     NP1 1000   1 15  28
        2  2     NP1 1100   1 12  22
        3  3     NP2  975   1  4  39
        4  4     NP2 1075   1  2  32
        5  5     NP3 1010   1  1  42
        6  6     NP3 1080   1  2  32
        7  7     AT4 1000   1 23  20
        8  8     AT4 1050   1 18  19
        

        这比问题中关于这种规模问题的 data.table 代码略快,但对于更大的问题,必须重新进行比较。

        此外,由于不必具体化中间结果(取决于查询优化器)并且可以在内存外处理它(如果需要),因此它可能能够处理更大的大小。

        library(data.table)
        library(dplyr)
        library(sqldf)
        library(rbenchmark)
        
        benchmark(sqldf = 
        sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
         from (
           select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
           from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
           group by a.id, b.Channel)
         group by id"),
        
        data.table = breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day",
             allow.cartesian = TRUE][
          Channel != i.Channel][
            order(id)][
              , delta := abs(Time - i.Time)][
                , .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
                  , unique(.SD, by = c("id", "i.Channel"))][
                    , .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)],
        
        dplyr = { breaks %>% 
          inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%
          filter(Channel != Channel.y) %>%
          group_by(id, Channel, Time, Day, ZA, Channel.y) %>%
          arrange(abs(Time - Time.y)) %>%
          filter(row_number()==1) %>%
          group_by(id, Channel, Time, Day, ZA) %>%
          summarise(Sum=sum(ZA.y)) %>%                           
          ungroup() %>% 
          select(id:Sum) },
        
        order = "elapsed")[1:4]
        

        给予:

                test replications elapsed relative
        1      sqldf          100    3.38    1.000
        2 data.table          100    4.05    1.198
        3      dplyr          100    9.23    2.731
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-02-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多