【问题标题】:Pandas: vectorize sliding time window aggregationPandas:矢量化滑动时间窗口聚合
【发布时间】:2021-08-04 11:12:02
【问题描述】:

我有一个大数据框,我需要从中获取一组给定查询点的滑动时间窗口平均值。我尝试使用df.rolling,但这不允许我查询任意点。以下方法有效,但似乎效率低下,并且不允许矢量化使用:

import pandas as pd
df = pd.DataFrame({'B': range(5)},
              index = [pd.Timestamp('20130101 09:00:00'),
                       pd.Timestamp('20130101 09:00:02'),
                       pd.Timestamp('20130101 09:00:03'),
                       pd.Timestamp('20130101 09:00:05'),
                       pd.Timestamp('20130101 09:00:06')])
query = pd.date_range(df.index[0], df.index[-1], freq='s')
time_window = pd.Timedelta(seconds=2)

f = lambda t: df[(t - time_window < df.index) & (df.index <= t)]["B"].mean()

[f(t) for t in query] # works but is slow
f(query) # throws ValueError length must match

也许这可以做得更好......

编辑:真正的应用程序具有在 30 到 90 秒之间随机出现的度量。有时有几天或几周没有数据的时期。 time_window 通常为 15 分钟。总体时间跨度为 10 年。

【问题讨论】:

    标签: pandas numpy vectorization


    【解决方案1】:

    你只是跳过了一小步。

    您的“查询”实际上是一个时间序列重采样操作。也就是说,除了计算滚动平均值之外,您还尝试以一秒的频率平滑地重新采样时间序列。您可以使用 asfreq 方法执行此操作,在滚动操作之前应用它:

    resample_rolling = df.asfreq('1s').rolling(pd.Timedelta(seconds=2)).mean()
     
    print(np.array([f(t) for t in query]))
    print(resample_rolling.to_numpy()[:, 0])
    

    输出:

    [0.  0.  1.  1.5 2.  3.  3.5]
    [0.  0.  1.  1.5 2.  3.  3.5]
    

    请注意,默认情况下,asfreq 方法使用nan 值填充缺失值。

    >>> df.asfreq(pd.Timedelta(seconds=1))
                           B
    2013-01-01 09:00:00  0.0
    2013-01-01 09:00:01  NaN
    2013-01-01 09:00:02  1.0
    2013-01-01 09:00:03  2.0
    2013-01-01 09:00:04  NaN
    2013-01-01 09:00:05  3.0
    2013-01-01 09:00:06  4.0
    

    滚动操作会忽略这些值。相反,如果您想用nans 以外的其他值填充值,您有两个选择。您可以提供fill_value

    >>> df.asfreq('1s', fill_value=0.0)
                           B
    2013-01-01 09:00:00  0.0
    2013-01-01 09:00:01  0.0
    2013-01-01 09:00:02  1.0
    2013-01-01 09:00:03  2.0
    2013-01-01 09:00:04  0.0
    2013-01-01 09:00:05  3.0
    2013-01-01 09:00:06  4.0
    

    或者你可以指定一个method,比如backfill,它使用系列中的下一个值:

    >>> df.asfreq('1s', method='backfill')
                         B
    2013-01-01 09:00:00  0
    2013-01-01 09:00:01  1
    2013-01-01 09:00:02  1
    2013-01-01 09:00:03  2
    2013-01-01 09:00:04  3
    2013-01-01 09:00:05  3
    2013-01-01 09:00:06  4
    

    当然,由此产生的滚动平均值是不同的:

    >>> df.asfreq('1s', method='backfill').rolling('1s').mean()
                           B
    2013-01-01 09:00:00  0.0
    2013-01-01 09:00:01  1.0
    2013-01-01 09:00:02  1.0
    2013-01-01 09:00:03  2.0
    2013-01-01 09:00:04  3.0
    2013-01-01 09:00:05  3.0
    2013-01-01 09:00:06  4.0
    

    【讨论】:

    • 好的。我确实理解这种方法,如果措施之间的时间很短,这种方法效果很好。在我的真实情况下,测量之间的时间或多或少在 30 到 90 秒之间随机变化,time_windows 是 15 分钟,我有 10 年的数据。所以 asfreq('1s') 会创建很多 NaN。
    • 您是否有理由不能使用与'1s' 不同的参数? asfreq 接受任意 Timedeltas 等。
    • 我想过这个,但测量的分辨率是以秒为单位的。我需要将索引四舍五入到分钟,如果两个度量仅相差 30 秒,这可能会导致不准确。
    • 对不起,我想我对你的问题还没有理解。您的慢速解决方案以一秒的频率重新采样数据。但这听起来毕竟不是您真正想要做的。你到底想做什么?
    • 感谢您的耐心等待,我很抱歉不准确。在过去的 10 年中,大约每分钟(30 到 90 秒)进行一次测量。对于一组等距的查询点(通常范围为 1 小时 50 点),我希望得到 15 分钟的平均值。
    【解决方案2】:

    经过一番研究,我想出了以下解决方案,其中有两个滚动窗口,一个用于进入窗口,一个用于离开:

    import pandas as pd, numpy as np
    df = pd.DataFrame({'B': range(5)},
                  index = [pd.Timestamp('20130101 09:00:00'),
                           pd.Timestamp('20130101 09:00:02'),
                           pd.Timestamp('20130101 09:00:03'),
                           pd.Timestamp('20130101 09:00:05'),
                           pd.Timestamp('20130101 09:00:06')])
    query = pd.date_range(df.index[0], df.index[-1], freq='s')
    time_window = pd.Timedelta(seconds=2)
    
    aggregates = ['mean']
    ### Preparation
    # one data point for each point entering the window
    df1 = df.rolling(window=time_window, closed='right').agg(aggregates)
    
    # one data point for each point leaving the window - use reverted df
    df2 = df[::-1].rolling(window=time_window, closed='left').agg(aggregates)
    df2.index += time_window
    # Caution: for my real data in the reverted rolling method, I had
    # to add a small Timedelta to window to function properly
    
    # merge both together and remove duplicates
    df_windowed = pd.concat([df1, df2])
    df_windowed.sort_index(inplace=True)
    df_windowed = df_windowed[~df_windowed.index.duplicated(keep='first')]
    
    ### the vectorized function
    # Caution: get_indexer returns -1 for not found values (below df.index.min()), 
    # which is interpreted as last value. But last value of df_windows is always NaN
    f = lambda t: df_windowed.iloc[
            df_windowed.index.get_indexer(t, method='ffill') if isinstance(t, (pd.Index, pd.Series, np.ndarray,)) else
            df_windowed.index.get_loc(t, method='ffill')
        ]["B"]["mean"].to_numpy()
    
    f(query)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-05-13
      • 2017-01-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-01
      • 2021-02-25
      相关资源
      最近更新 更多