【问题标题】:Pandas: Zigzag segmentation of data based on local minima-maximaPandas:基于局部最小值-最大值的数据之字形分割
【发布时间】:2020-04-20 21:48:47
【问题描述】:

我有一个时间序列数据。生成数据

date_rng = pd.date_range('2019-01-01', freq='s', periods=400)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']

我想创建一条连接局部最大值和局部最小值的曲折线,它满足在 y 轴上,每条曲折线的|highest - lowest value| 必须超过一个百分比(比如 20%)的条件前一条之字形线的距离,以及一个预先设定的值 k(比如 1.2)

我可以使用以下代码找到局部极值:

# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]

# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)

# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

但我不知道如何对其应用阈值条件。 请告诉我如何申请这样的条件。

由于数据可能包含数百万个时间戳,因此强烈建议进行高效计算

为了更清楚的描述:

示例输出,来自我的数据:

 # Instantiate axes.
(fig, ax) = plt.subplots()
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Zigzag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

我想要的输出(与此类似,之字形仅连接重要段)

【问题讨论】:

    标签: python pandas time-series technical-indicator


    【解决方案1】:

    您可以使用 Pandas 滚动功能来创建局部极值。与您的 Scipy 方法相比,这稍微简化了代码。

    求极值的函数:

    def islocalmax(x):
        """Both neighbors are lower,
        assumes a centered window of size 3"""
        return (x[0] < x[1]) & (x[2] < x[1])
    
    def islocalmin(x):
        """Both neighbors are higher,
        assumes a centered window of size 3"""
        return (x[0] > x[1]) & (x[2] > x[1])
    
    def isextrema(x):
        return islocalmax(x) or islocalmin(x)
    

    创建 zigzag 的函数,它可以一次应用于 Dataframe(在每一列上),但这将引入 NaN,因为每列返回的时间戳不同。您可以稍后轻松删除它们,如下例所示,或者简单地将函数应用于 Dataframe 中的单个列。

    请注意,我取消了针对阈值 k 的测试的注释,我不确定是否完全正确理解了该部分。如果之前和当前极端之间的绝对差需要大于k,则可以包含它:&amp; (ext_val.diff().abs() &gt; k)

    我也不确定最终的曲折是否应该始终从原始高点移动到低点,反之亦然。我认为应该是这样,否则您可以在函数末尾删除第二次极端搜索。

    def create_zigzag(col, p=0.2, k=1.2):
    
        # Find the local min/max
        # converting to bool converts NaN to True, which makes it include the endpoints    
        ext_loc = col.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)
    
        # extract values at local min/max
        ext_val = col[ext_loc]
    
        # filter locations based on threshold
        thres_ext_loc = (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)) #& (ext_val.diff().abs() > k)
    
        # Keep the endpoints
        thres_ext_loc.iloc[0] = True
        thres_ext_loc.iloc[-1] = True
    
        thres_ext_loc = thres_ext_loc[thres_ext_loc]
    
        # extract values at filtered locations 
        thres_ext_val = col.loc[thres_ext_loc.index]
    
        # again search the extrema to force the zigzag to always go from high > low or vice versa,
        # never low > low, or high > high
        ext_loc = thres_ext_val.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)
        thres_ext_val  =thres_ext_val[ext_loc]
    
        return thres_ext_val
    

    生成一些样本数据:

    date_rng = pd.date_range('2019-01-01', freq='s', periods=35)
    
    df = pd.DataFrame(np.random.randn(len(date_rng), 3),
                      columns=['data1', 'data2', 'data3'],
                      index= date_rng)
    
    df = df.cumsum()
    

    应用函数并提取“data1”列的结果:

    dfzigzag = df.apply(create_zigzag)
    data1_zigzag = dfzigzag['data1'].dropna()
    

    可视化结果:

    fig, axs = plt.subplots(figsize=(10, 3))
    
    axs.plot(df.data1, 'ko-', ms=4, label='original')
    axs.plot(data1_zigzag, 'ro-', ms=4, label='zigzag')
    axs.legend()
    

    【讨论】:

    • 感谢您的回答。我想问一下这条线(ext_val.diff().abs() &gt; (ext_val.shift(-1).abs() * p)),据我了解,您是在将两点之间的距离与最后一点的p% 进行比较,对吗?因为我想把每一个zigzag段和上一段进行比较,一直重复直到满足条件。
    【解决方案2】:

    我已经回答了我对这个问题的最佳理解。然而,变量 K 如何影响过滤器尚不清楚。

    您想根据运行条件过滤极值。我假设您要标记与最后一个标记极值的相对距离大于 p% 的所有极值。我进一步假设您始终将时间序列的第一个元素视为有效/相关点。

    我使用以下过滤器功能实现了这一点:

    def filter(values, percentage):
        previous = values[0] 
        mask = [True]
        for value in values[1:]: 
            relative_difference = np.abs(value - previous)/previous
            if relative_difference > percentage:
                previous = value
                mask.append(True)
            else:
                mask.append(False)
        return mask
    

    为了运行你的代码,我首先导入依赖:

    from scipy import signal
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    

    为了使代码可复制,我修复了随机种子:

    np.random.seed(0)
    

    剩下的就是copypasta。请注意,我减少了样本量以使结果清晰。

    date_rng = pd.date_range('2019-01-01', freq='s', periods=30)
    df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                      columns=['data1', 'data2', 'data3'],
                      index= date_rng)
    s = df['data1']
    # Find peaks(max).
    peak_indexes = signal.argrelextrema(s.values, np.greater)
    peak_indexes = peak_indexes[0]
    # Find valleys(min).
    valley_indexes = signal.argrelextrema(s.values, np.less)
    valley_indexes = valley_indexes[0]
    # Merge peaks and valleys data points using pandas.
    df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
    df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
    df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)
    # Sort peak and valley datapoints by date.
    df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])
    

    然后我们使用过滤功能:

    p = 0.2 # 20% 
    filter_mask = filter(df_peaks_valleys.zigzag_y, p)
    filtered = df_peaks_valleys[filter_mask]
    

    按照您之前的绘图以及新过滤的极值进行绘图:

     # Instantiate axes.
    (fig, ax) = plt.subplots(figsize=(10,10))
    # Plot zigzag trendline.
    ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                            color='red', label="Extrema")
    # Plot zigzag trendline.
    ax.plot(filtered['date'].values, filtered['zigzag_y'].values, 
                                                            color='blue', label="ZigZag")
    
    # Plot original line.
    ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)
    
    # Format time.
    ax.xaxis_date()
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    
    plt.gcf().autofmt_xdate()   # Beautify the x-labels
    plt.autoscale(tight=True)
    
    plt.legend(loc='best')
    plt.grid(True, linestyle='dashed')
    

    编辑

    如果想同时认为第一个点和最后一个点都有效,那么您可以按如下方式调整过滤器功能:

    def filter(values, percentage):
        # the first value is always valid
        previous = values[0] 
        mask = [True]
        # evaluate all points from the second to (n-1)th
        for value in values[1:-1]: 
            relative_difference = np.abs(value - previous)/previous
            if relative_difference > percentage:
                previous = value
                mask.append(True)
            else:
                mask.append(False)
        # the last value is always valid
        mask.append(True)
        return mask
    

    【讨论】:

    • 嗨,感谢您的出色回答。是的,您的假设是正确的“标记与最后一个标记的极值的相对距离大于 p% 的所有极值。”,并且应该始终考虑第一个点和最后一个点。我已经检查了你的答案,有时它错过了最后一点,你能帮我吗?
    猜你喜欢
    • 1970-01-01
    • 2020-01-30
    • 2013-08-12
    • 1970-01-01
    • 2021-09-12
    相关资源
    最近更新 更多