【问题标题】:Start, End and Duration of Maximum Drawdown in PythonPython 中最大回撤的开始、结束和持续时间
【发布时间】:2014-05-01 16:29:20
【问题描述】:

给定一个时间序列,我想计算最大回撤,我还想定位最大回撤的起点和终点,以便计算持续时间。我想在时间序列图上标记回撤的开始和结束,如下所示:

到目前为止,我已经获得了生成随机时间序列的代码,并且已经获得了计算最大回撤的代码。如果有人知道如何识别回撤开始和结束的地方,我将不胜感激!

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# create random walk which I want to calculate maximum drawdown for:

T = 50
mu = 0.05
sigma = 0.2
S0 = 20
dt = 0.01
N = round(T/dt)
t = np.linspace(0, T, N)
W = np.random.standard_normal(size = N) 
W = np.cumsum(W)*np.sqrt(dt) ### standard brownian motion ###
X = (mu-0.5*sigma**2)*t + sigma*W 

S = S0*np.exp(X) ### geometric brownian motion ###
plt.plot(S)

# Max drawdown function      

def max_drawdown(X):
    mdd = 0
    peak = X[0]
    for x in X:
        if x > peak: 
            peak = x
        dd = (peak - x) / peak
        if dd > mdd:
            mdd = dd
    return mdd    

drawSeries = max_drawdown(S)
MaxDD = abs(drawSeries.min()*100)
print MaxDD


plt.show()

【问题讨论】:

标签: python numpy time-series algorithmic-trading


【解决方案1】:

只要找出运行最大值减去当前值最大的地方:

n = 1000
xs = np.random.randn(n).cumsum()
i = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
j = np.argmax(xs[:i]) # start of period

plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)

【讨论】:

  • 实现最大回撤的真正干净解决方案!
  • 如果你不介意,你能解释一下 i 和 j 的代码吗?
  • For i: np.maximum.accumulate(xs) 为我们提供了累积最大值。取那个和 xs 之间的差并找到它的 argmax 给我们提供了累积回撤最大化的位置。然后对于 j: xs[:i] 获取从周期开始到 i 点的所有点,最大回撤结束。 np.argmax(xs[:i]) 找到图中最高(最大)点的位置/索引,直到该点,这就是我们正在寻找的峰值。
  • 如果没有回撤(所有点都高于前一个),此方法会抛出错误。应检查 i == 0 是否为 true,则回撤也为 0
  • 能否请您展示如何将真实的“日期”添加到此缩编图的 x 轴?到目前为止,代码有效,但仅适用于 numpy 数组。如果时间序列以带有时间戳作为索引的 pandas 系列的方式出现怎么办?换句话说,在情节上显示真实日期会非常好,这样您就可以了解您看待事物的时间范围。谢谢!
【解决方案2】:

如果这对任何人有帮助,我添加了水下分析...

def drawdowns(equity_curve):
    i = np.argmax(np.maximum.accumulate(equity_curve.values) - equity_curve.values) # end of the period
    j = np.argmax(equity_curve.values[:i]) # start of period

    drawdown=abs(100.0*(equity_curve[i]-equity_curve[j]))

    DT=equity_curve.index.values

    start_dt=pd.to_datetime(str(DT[j]))
    MDD_start=start_dt.strftime ("%Y-%m-%d") 

    end_dt=pd.to_datetime(str(DT[i]))
    MDD_end=end_dt.strftime ("%Y-%m-%d") 

    NOW=pd.to_datetime(str(DT[-1]))
    NOW=NOW.strftime ("%Y-%m-%d")

    MDD_duration=np.busday_count(MDD_start, MDD_end)

    try:
        UW_dt=equity_curve[i:].loc[equity_curve[i:].values>=equity_curve[j]].index.values[0]
        UW_dt=pd.to_datetime(str(UW_dt))
        UW_dt=UW_dt.strftime ("%Y-%m-%d")
        UW_duration=np.busday_count(MDD_end, UW_dt)
    except:
        UW_dt="0000-00-00"
        UW_duration=np.busday_count(MDD_end, NOW)

    return MDD_start, MDD_end, MDD_duration, drawdown, UW_dt, UW_duration

【讨论】:

    【解决方案3】:

    behzad.nouri 解决方案非常干净,但它不是最大提取时间(无法发表评论,因为我刚刚开设了我的帐户并且我没有足够的声誉 atm)。

    您最终得到的是名义价值的最大下降,而不是价值的相对下降(百分比下降)。例如,如果您将此应用于长期上升的时间序列(例如股票市场指数标准普尔 500 指数),则最近的价值下降(较高的名义价值下降)将优先于较早的价值下降,因为只要名义价值/点的下降幅度更大。

    例如标准普尔 500:

    • 2007-08金融危机,下跌56.7%,888.62点
    • 近期冠状病毒危机,下跌 33.9%,1,1148.75 点

    通过将此方法应用于 2000 年之后的时期,您将看到冠状病毒危机而不是 2007-08 年金融危机

    以下相关代码(来自 behzad.nouri):

    n = 1000
    xs = np.random.randn(n).cumsum()
    i = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
    j = np.argmax(xs[:i]) # start of period
    
    plt.plot(xs)
    plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)
    

    您只需将该名义值的下降除以最大累积量即可获得相对 (%) 下降。

    ( np.maximum.accumulate(xs) - xs ) / np.maximum.accumulate(xs)
    

    【讨论】:

      【解决方案4】:

      您的 max_drawdown 已经记录了峰值位置。修改if,使其在存储mdd时也存储结束位置mdd_end,以及return mdd, peak, mdd_end

      【讨论】:

        【解决方案5】:

        我同意 k0rnik。

        证明 behzad.nouri 给出的公式可能产生错误结果的简短示例。

        xs = [1, 50, 10, 180, 40, 200]
        
        pos_min1 = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
        pos_peak1 = np.argmax(xs[:pos_min1]) # start of period
        
        pos_min2 = np.argmax((np.maximum.accumulate(xs) - 
        xs)/np.maximum.accumulate(xs)) # end of the period
        pos_peak2 = np.argmax(xs[:pos_min2]) # start of period
        
        plt.plot(xs)
        plt.plot([pos_min1, pos_peak1], [xs[pos_min1], xs[pos_peak1]], 'o', 
        label="mdd 1", color='Red', markersize=10)
        plt.plot([pos_min2, pos_peak2], [xs[pos_min2], xs[pos_peak2]], 'o', 
        label="mdd 2", color='Green', markersize=10)
        plt.legend()
        
        mdd1 = 100 * (xs[pos_min1] - xs[pos_peak1]) / xs[pos_peak1]
        mdd2 = 100 * (xs[pos_min2] - xs[pos_peak2]) / xs[pos_peak2]
        
        print(f"solution 1: peak {xs[pos_peak1]}, min {xs[pos_min1]}\n rate : 
        {mdd1}\n")
        print(f"solution 2: peak {xs[pos_peak2]}, min {xs[pos_min2]}\n rate : 
        {mdd2}")
        

        此外,资产的价格不能为负数,所以

        xs = np.random.randn(n).cumsum()
        

        不正确。最好添加:

        xs -= (np.min(xs) - 10)
        

        【讨论】:

          【解决方案6】:

          这个解决方案已经过测试并且有效,但这里我计算的是最大持续时间回撤,而不是最大回撤的持续时间。该解决方案可以轻松调整以找到最大回撤的持续时间。

          def max_dur_drawdown(dfw, threshold=0.05):
              """
              Labels all drawdowns larger in absolute value than a threshold and returns the 
              drawdown of maximum duration (not the max drawdown necessarily but most often they
              coincide).
              
              Args:
                  dfw (pd.DataFrame): monthly data, the pre-computed drawdowns or underwater.
                  threshold (float): only look at drawdowns greater than this in absolute value e.g. 5%
                  
              Returns:
                  dictionary containing the start, end dates and duration in months for the maximum
                  duration drawdowns keyed by column name.
              """
              max_dur_per_column = {}
              columns = dfw.columns.copy()
              mddd_start = {}
              mddd_end = {}
              mddd_duration = {}
              for col in columns:
                  # run the drawdown labeling algorithm
                  dfw['sign'] = 0
                  dfw['sign'].loc[dfw[col] == 0] = +1
                  dfw['sign'].loc[dfw[col] <  0] = -1
                  # find the sign change data points
                  dfw['change'] = dfw['sign'] != dfw['sign'].shift(1)
                  # the first change doesn't count
                  dfw['change'].iloc[0] = False
                  # demarcate the lef and right of the drawdowns
                  left = dfw[(dfw['change'] == True) & (dfw['sign'] == -1)].index.values
                  right = dfw[(dfw['change'] == True) & (dfw['sign'] == 1)].index.values
                  min_len = min(len(left), len(right))
                  intervals = pd.IntervalIndex.from_arrays(left[0:min_len], right[0:min_len])
                  # find the minimum value per drawdown interval so we label all data points to the left of it.
                  min_per_int = list(map(lambda i: (i.left, i.right, dfw[col][(dfw.index >= i.left) & (dfw.index < i.right)].min()), intervals))
                  # filter out drawdowns lower in absolute value than a threshold
                  min_per_int = list(filter(None.__ne__, list(map(lambda x: None if x[2] >= -threshold else x, min_per_int))))
                  # label only the negative part of the underwater NDD stands for negative-side drawdown.
                  dfw['NDD'] = 0
                  mddd_start[col] = None
                  mddd_end[col] = None
                  mddd_duration[col] = 0
                  for i in min_per_int:
                      # find the index of the data point that is minimum this is an argmin
                      min_idx = dfw[(dfw.index >= i[0]) & (dfw.index < i[1]) & (abs(dfw[col] - i[2]) < 1e-15)].index[0]
                      # compute the duration and update the maximum duration if needed
                      tmp_dur = int(np.round((min_idx - i[0]) / np.timedelta64(1, 'M')))
                      if tmp_dur > mddd_duration[col]:
                          mddd_start[col] = i[0].date()
                          mddd_end[col] = min_idx.date()
                          mddd_duration[col] = tmp_dur
          
              return mddd_start, mddd_end, mddd_duration
              
          

          示例用法:

          # compute cumulative returns
          dfc = pd.DataFrame(dfr['S&P500'] / dfr['S&P500'][0])
          
          # compute drawdowns
          dfw = dfc / dfc.cummax() - 1
          
          print(max_dur_drawdown(dfw))
          

          【讨论】:

            猜你喜欢
            • 2019-11-12
            • 1970-01-01
            • 2019-08-24
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2015-03-13
            相关资源
            最近更新 更多