whiteBear

对指标的实现

分为两部分:

  1. 信号的计算

实现信号算法
检测历史信号
保存到数据库

  1. 信号使用

提供查询接口

我们将信号的计算与回测分离开,将计算后的信号结果保存到数据库中,供回测时调用,模式图如下:

指标

MACD(金叉和死叉)

走势展示:

\[DIFF(MACD线):短时EMA-长时EMA \]

\[DEA(信号线):DIFF的EMA \]

\[MACD(红绿柱):(DIFF - DEA) \]

具体的计算方法:
\(EMA_i\)

\[EMA_i = \frac{1}{N+1}(CLOSE_i - EMA_{i-1})+EMA_{i-1} \]

\[其中EMA_0 = CLOSE_0 \]

短时EMA

\[EMA_1 = EMA(CLOSE,short) \]

长时EMA

\[EMA_2= EMA(CLOSE,long \]

DIFF

\[DIFF=EMA_1 - EMA_2 \]

DEA

\[DEA = EMA(DIFF,m) \]

通常将短时EMA中参数short设为12,长时EMA中参数long设为26,m=9

早期国内股市一周是6个交易日,short=12相当于两周,一个月就26个交易日,

金叉与死叉

死叉

  • DIFF下穿DEA
  • \(DIFF_{i-1}>=DEA_{i-1}\) && \(DIFF_{i}<DEA_{i}\)

金叉

  • DIFF上传DEA
  • \(DIFF_{i-1}<=DEA_{i-1}\) && \(DIFF_{i}>DEA_{i}\)
金叉、死叉直观展示

流程图

EMA的计算流程

MACD金叉和死叉的计算流程

代码实现

信号的计算

from stock_util import get_all_codes
from database import DB_CONN
from pymongo import ASCENDING,UpdateOne
import pandas as pd
import traceback

def compute_macd(begin_date,end_date):
    """
    计算给定周期内的MACD金叉和死叉信号,把结果保存在数据库中
    :param begin_date:开始日期
    :param end_date:结束日期
    """

    #参数设定,这几个参数是计算MACD时需要的,取值都是常用值,也可以根据需要进行调整
    #短时
    short=12
    #长时
    long=26
    #计算DIFF的M值
    m = 9
    
    #获取股票列表
    codes = get_all_codes()

    #循环检测所有股票的MACD金叉和死叉信号
    for code in codes:
        try:
            
            #获取后复权的价格,使用后复权的价格计算MACD
            daily_cursor = DB_CONN[\'daily_hfq\'].find(
                {\'code\':code,\'date\':{\'$gte\':begin_date,\'$lte\':end_date},\'index\':False},
                sort=[(\'date\',ASCENDING)],
                projection={\'date\':True,\'close\':True,\'_id\':False})
            #将数据存入DataFrame格式
            df_daily = pd.DataFrame([daily for daily in daily_cursor])
            #将date设为index索引
            df_daily.set_index([\'date\'],1,inplace = True)
            
            #计算EMA
            #alpha = 2/(N+1)
            #EMA(i) = (1-alpha)*EMA[i-1]+alpha*CLOSE[i]
            #       = alpha*(CLOSE[i]-EMA[i-1]) + EMA[i-1]
            index = 0
            #短时EMA列表
            EMA1=[]
            #长时EMA列表
            EMA2 =[]
            #每天计算短时EMA和长时EMA
            for date in df_daily.index:
                if index == 0:
                    #第一天EMA就是当日的close,也就是收盘价
                    EMA1.append(df_daily.loc[date][\'close\'])
                    EMA2.append(df_daily.loc[date][\'close\'])
                else:
                    #短时EMA和长时EMA
                    EMA1.append(2/(short+1)*(df_daily.loc[date][\'close\']-EMA1[index-1])+EMA1[index-1])
                    EMA2.append(2/(long+1) * (df_daily.loc[date][\'close\'] - EMA2[index-1]) + EMA2[index-1])
                index+=1
            #将短时EMA和长时EMA作为DataFrame的数据列
            df_daily[\'EMA1\'] = EMA1
            df_daily[\'EMA2\'] = EMA2
            #计算DIFF,短时EMA - 长时EMA
            df_daily[\'DIFF\'] = df_daily[\'EMA1\'] - df_daily[\'EMA2\']
            #计算DEA,DIFF的EMA,计算公式为:EMA(DIFF,M)
            index = 0
            DEA = []
            for date in df_daily.index:
                if index ==0 :
                    DEA.append(df_daily.loc[date][\'DIFF\'])
                else:
                    #M = 9;DEA = EMA(DIFF,9)
                    DEA.append(2/(m+1)*(df_daily.loc[date][\'DIFF\']-DEA[index-1]) + DEA[index-1])
                index +=1
            df_daily[\'DEA\'] = DEA
            #计算DIFF与DEA的差值
            df_daily[\'delta\'] = df_daily[\'DIFF\'] - df_daily[\'DEA\']
            #将delta的移一位,那么前一天的delta就变成了今天的pre_delta,将数据整体向下移动
            df_daily[\'pre_delta\'] = df_daily[\'delta\'].shift(1)
            #金叉,DIFF上穿DEA,前一日DIFF在DEA下面,当日DIFF在DEA下面
            df_daily_gold = df_daily[(df_daily[\'pre_delta\']<=0) & (df_daily[\'delta\']>0)]
            #死叉,DIFF下穿DEA,前一日DIFF在DEA上面,当日DIFF在DEA下面
            df_daily_dead = df_daily[(df_daily[\'pre_delta\'] >= 0)&(df_daily[\'delta\']<0)]

            #保存结果到数据库中
            update_requests = []
            for date in df_daily_gold.index:
                #保存时以code和date为查询条件,做更新或新建,所以对code和date建立索引,通过signal字段表示金叉还是死叉,gold表示金叉
                update_requests.append(UpdateOne(
                    {\'code\':code,\'date\':date},
                    {\'$set\':{\'code\':code,\'date\':date,\'signal\':\'gold\'}},
                    upsert=True))
            for date in df_daily_dead.index:
                #保存时以code和date为查询条件,做更新或新建,所以对code和date建立索引,通过signal字段表示金叉还是死叉,dead表示死叉
                update_requests.append(UpdateOne(
                    {\'code\':code,\'date\':date},
                    {\'$set\':{\'code\':code,\'date\':date,\'signal\':\'dead\'}},
                    upsert=True))
            if len(update_requests)>0:
                update_result = DB_CONN[\'macd\'].bulk_write(update_requests,ordered=False)
                print(\'Save MACD ,股票代码:%s,插入:%4d,更新:%4d\' %(code,update_result.upserted_count,update_result.modified_count),flush=True)

        except:
            print(\'错误发生: %s\' % code, flush=True)
            traceback.print_exc()



if __name__ == "__main__":
    compute_macd(\'2019-01-01\',\'2019-12-31\')

提供外部接口,供回测

def is_macd_gold(code,date):
    """
    判断某只股票在某个交易日是否出现MACD金交信号
    :param code:股票代码
    :param date:日期
    :return :True -有金叉信号,False-无金叉信号
    """
    count = DB_CONN[\'macd\'].count({\'code\':code,\'date\':date,\'signal\':\'gold\'})
    return count == 1
def is_macd_dead(code, date):
    """
    判断某只股票在某个交易日是否出现MACD死叉信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 有死叉信号,False - 无死叉信号
    """
    count = DB_CONN[\'macd\'].count({\'code\': code, \'date\': date, \'signal\': \'dead\'})
    return count == 1

回测结果展示


RSI(相对强弱指数)

定义

指的是一定时间窗口内,上涨幅度之和占整体涨跌幅度绝对值之和的比例
表现的是买入意愿相对于总体成交的强弱。当买方意愿达到超买区,意味着有可能会跌了,发出卖出预警,当卖方意愿达到超卖区,意味着有可能涨了,发出买入信号。这都蕴含着盛极而衰、否极泰来的意思。

超买区、超卖区

超卖区:RSI < 20
超买区:RSI > 80

强弱信号

超买:RSI上穿80
超卖:RSI下穿20

计算公式

\[change = close - prev_close \]

\[up_change = max(change,0) \]

\[RSI = mean(up_change,N) * 100 / mean(abs(change),N) \]

RSI超买超卖直观展示

流程图

代码实现

信号计算

from stock_util import get_all_codes,DB_CONN
import pandas as pd
import matplotlib.pyplot as plt
from pymongo import UpdateOne,ASCENDING
import traceback

def compute_rsi(begin_date,end_date):
    """
    计算指定时间段内的RSI信号,并保存到数据库中
    :param begin_date:开始日期
    :param end_date:结束日期
    """

    #获取所有股票代码
    codes = get_all_codes()
    #codes = [\'002468\']

    #计算RSI
    N = 12

    #计算所有股票的RSI信号
    for code in codes:
        try:
            #获取后复权价格,使用后复权的价格计算RSI
            daily_cursor = DB_CONN[\'daily_hfq\'].find(
                {\'code\':code,\'date\':{\'$gte\':begin_date,\'$lte\':end_date},\'index\':False},
                sort=[(\'date\',ASCENDING)],
                projection={\'date\':True,\'close\':True,\'_id\':False})
            df_daily = pd.DataFrame([daily for daily in daily_cursor])

            #如果查询出来的行情数据不足以计算N天的平均值,则不参与计算
            # if len(df_daily) < N:
            if df_daily.index.size < N:
                print(\'数据量不足:%s\' %code,flush= True)
                continue
            #将日期设置为索引
            df_daily.set_index([\'date\'],drop=True,inplace=True)
            #将close移一位作为当日的pre_close
            df_daily[\'pre_close\'] = df_daily[\'close\'].shift(1)
            #计算当日的涨跌幅:(close- pre_close) * 100/pre_close
            df_daily[\'change_pct\'] = (df_daily[\'close\'] - df_daily[\'pre_close\'])*100 / df_daily[\'pre_close\']
            #只保留上涨的日期的涨幅
            df_daily[\'up_pct\'] = pd.DataFrame({\'up_pct\':df_daily[\'change_pct\'],\'zero\':0}).max(1)
           

            #计算RSI
            df_daily[\'RSI\'] = df_daily[\'up_pct\'].rolling(N).mean() / abs(df_daily[\'change_pct\']).rolling(N).mean()*100

            #移位,记作prev_rsi
            df_daily[\'PREV_RSI\'] = df_daily[\'RSI\'].shift(1)
            print(df_daily,flush=True)
            #超买,RSI下穿80,作为卖出信号
            df_daily_over_bough = df_daily[(df_daily[\'RSI\']<80) & (df_daily[\'PREV_RSI\'] >=80)]
            #超卖,RSI上穿20,作为买入信号
            df_daily_over_sold = df_daily[(df_daily[\'RSI\'] >20) & (df_daily[\'PREV_RSI\'] <=20)]
            #保存到数据库,要以code和date创建索引,db.rsi.createIndex({\'index\':1,\'date\':1})
            update_requests = []
            #超买数据,以code和date为key更新数据,signal为over_bought
            for date in df_daily_over_bough.index:
                update_requests.append(UpdateOne(
                    {\'code\':code,\'date\':date},
                    {\'$set\':{\'code\':code,\'date\':date,\'signal\':\'over_bought\'}},
                    upsert = True))
            #超卖数据,以code和date为key更新数据,signal为over_sold
            for date in df_daily_over_sold.index:
                update_requests.append(UpdateOne(
                    {\'code\':code,\'date\':date},
                    {\'$set\':{\'code\':code,\'date\':date,\'signal\':\'over_sold\'}},
                    upsert=True))     
            
            if len(update_requests)>0:
                update_result = DB_CONN[\'rsi\'].bulk_write(update_requests,ordered=False)  
                print(\'Save RSI, 股票代码:%s, 插入:%4d, 更新:%4d\' %
                    (code, update_result.upserted_count, update_result.modified_count), flush=True) 
        except:
            print(\'错误发生:%s\'%code,flush=True)
            traceback.print_exc()
if __name__ == "__main__":
    compute_rsi(\'2019-01-01\',\'2019-12-31\')

提供外部接口,供回测

def is_rsi_over_sold(code, date):
    """
    判断某只股票在某个交易日是出现了超卖信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现了超卖信号,False - 没有出现超卖信号
    """
    count = DB_CONN[\'rsi\'].count({\'code\': code, \'date\': date, \'signal\': \'over_sold\'})
    return count == 1

def is_rsi_over_bought(code,date):
    """
    判断某只股票在某个交易日是出现了超买信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现了超买信号,False - 没有出现超买信号
    """
    count = count = DB_CONN[\'rsi\'].count({\'code\':code,\'date\':date,\'signal\':\'over_bought\'})
    return count ==1

回测结果展示


注:时间窗口长度、超买、超卖区间的阈值可根据实际情况自行调整

Boll(突破上轨与突破下轨)

定义

Boll是基于统计学的标准差原理
包含三条轨道:

  • 上轨:压力线
  • 中轨:价格平均线
  • 下轨:支撑线
计算公式(N = 20,k =2 )

价格均值

\[MA = \frac{1}{N}\sum_{i=0}^{N}CLOSE_i \]

标准差

\[STD = \sqrt{\frac{1}{N}\sum_{i=0}^{N}(CLOSE_i - MA)^2} \]

中轨(盘中实时计算,当日收盘价不稳定,所以计算的是前一天的均价)

\[MB = \frac{1}{N-1}\sum_{i=0}^{N-1}CLOSE_i \]

上轨

\[UP = MB + k * STD \]

下轨

\[DOWN = MB - k * STD \]

Boll直观展示

流程图

代码实现

信号计算

from stock_util import get_all_codes
from database import DB_CONN
import pandas as pd

from pymongo import ASCENDING,UpdateOne
import traceback 

def compute_boll(begin_date,end_date):
    """
    计算指定日期内的boll突破上轨和突破下轨信号,并保存到数据库中,方便查询调用
    :param begin_date:开始日期
    :param end_date:结束日期
    """
    #获取所有股票列表
    codes = get_all_codes()
    #计算每只股票的Boll值
    for code in codes:
        try:
            #获取后复权的价格,使用后复权的价格计算BOLL
            daily_cursor = DB_CONN[\'daily_hfq\'].find(
                {\'code\':code,\'date\':{\'$gte\':begin_date,\'$lte\':end_date},\'index\':False},
                sort=[(\'date\',ASCENDING)],
                projection={\'date\':True,\'close\':True,\'_id\':False})
            df_daily = pd.DataFrame([daily for daily in daily_cursor])
            #计算中轨;计算MB,盘后计算,这里用当日的close
            df_daily[\'MB\'] = df_daily[\'close\'].rolling(20).mean()
            #计算标准差,计算STD20,即20日的标准差
            df_daily[\'std\'] = df_daily[\'close\'].rolling(20).std()
            print(df_daily,flush=True)

            #计算上轨up
            df_daily[\'UP\'] = df_daily[\'MB\'] + 2 * df_daily[\'std\']
            #计算下轨down
            df_daily[\'DOWN\'] = df_daily[\'MB\'] - 2 * df_daily[\'std\']
            print(df_daily,flush=True)
            #将日期作为索引
            df_daily.set_index([\'date\'],inplace=True)
            #将close移动一位,变成当前索引位置的前收
            last_close = df_daily[\'close\'].shift(1)
            #将上轨移动一位,实现前一日的上轨和前一日的收盘价都在当日了
            shifted_up = df_daily[\'UP\'].shift(1)
            #计算突破上轨,是向上突破,条件是前一日的收盘价小于前一日上轨,当日收盘价大于当日上轨
            df_daily[\'up_mask\'] = (last_close <= shifted_up) & (df_daily[\'close\'] > shifted_up)
            #将下轨移动一位,前一日的下轨和前一日的收盘就就都在当日了
            shifted_down = df_daily[\'DOWN\'].shift(1)
            #突破下轨,向下突破,条件是前一日收盘价大于前一日下轨,当日收盘价小于当日下轨
            df_daily[\'down_mask\'] = (last_close >= shifted_down) & (df_daily[\'close\'] < shifted_down)
            #对结果进行过滤,只保留向上突破或者向下突破的数据
            df_daily = df_daily[df_daily[\'up_mask\'] | df_daily[\'down_mask\']]
            #从DataFrame中扔掉不用的数据
            df_daily.drop([\'close\',\'std\',\'MB\',\'UP\',\'DOWN\'],1,inplace=True)

            #将信号保存到数据库中
            update_requsts = []
            #DataFrame的索引为日期
            for date in df_daily.index:
                #保存数据包括股票代码、日期和信号类型,结合数据集的名字,就表示某只股票在某日
                doc = {
                    \'code\':code,
                    \'date\':date,
                    #方向,向上突破,UP,向下突破,DOWN
                    \'direction\':\'up\' if df_daily.loc[date][\'up_mask\'] else \'down\'
                }
                update_requsts.append(UpdateOne(doc,{\'$set\':doc},upsert=True))
            
            #如有信号数据,将保存到数据库中
            if len(update_requsts)>0:
                update_result = DB_CONN[\'boll\'].bulk_write(update_requsts,ordered=False)
                print(\'%s ,upserted:%4d,modified:%4d\'%(code,update_result.upserted_count,update_result.modified_count),flush=True)

        except :
            traceback.print_exc()
if __name__ == "__main__":
    compute_boll(\'2019-01-01\',\'2019-12-31\')

提供外部接口,供回测

def is_boll_break_down(code,date):
    """
    查询某只股票是否在某日出现了突破下轨信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现了突破下轨信号,False - 没有出现突破下轨信号
    """
    count = DB_CONN[\'boll\'].count({\'code\':code,\'date\':date,\'direction\':\'down\'})
    return count==1

def is_boll_break_up(code,date):
    """
    查询某只股票是否在某日出现了突破上轨信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现了突破上轨信号,False - 没有出现突破上轨信号
    """
    count = DB_CONN[\'boll\'].count({\'code\':code,\'date\':date,\'direction\':\'up\'})
    return count == 1
回测结果展示


获取数据的周期,可以是日、周、月甚至是分钟;上述代码中有N和k两个可变参数

分型(顶分型与底分型)

分型直观展示

#####流程图

代码实现

信号计算

import pandas as pd
import traceback
from pymongo import ASCENDING,UpdateOne
from stock_util import get_all_codes
from database import DB_CONN


def compute_fractal(begin_date,end_date):
    """
    计算给定周期内的分型信号,把结果保存在数据库中
    :param begin_date:开始日期
    :param end_date:结束日期
    """
    #获取所有股票代码
    codes = get_all_codes()

    #计算每只股票的信号
    for code in codes:
        try:
            #获取后复权的价格,使用后复权价格计算分型信号
            daily_cursor = DB_CONN[\'daily_hfq\'].find(
                {\'code\':code,\'date\':{\'$gte\':begin_date,\'$lte\':end_date},\'index\':False},
                sort=[(\'date\',ASCENDING)],
                projection={\'date\':True,\'high\':True,\'low\':True,\'_id\':False})
            df_daily = pd.DataFrame([daily for daily in daily_cursor])

            #将日期设置为索引
            df_daily.set_index([\'date\'],1,inplace=True)
            #通过移位将前两天和后两天对齐到中间一天
            df_daily_shift_1 = df_daily.shift(1)
            df_daily_shift_2 = df_daily.shift(2)
            df_daily_shift_3 = df_daily.shift(3)
            df_daily_shift_4 = df_daily.shift(4)
            #顶分型,中间日的最高价既大于前两天的价格,又大于后两天的价格
            df_daily[\'up\'] = (df_daily_shift_3[\'high\'] > df_daily_shift_1[\'high\']) & \
                            (df_daily_shift_3[\'high\'] > df_daily_shift_2[\'high\']) & \
                            (df_daily_shift_3[\'high\'] > df_daily_shift_4[\'high\']) & \
                            (df_daily_shift_3[\'high\'] > df_daily[\'high\'])
            #底分型,中间日的最低价既低于前两天的最低价,又低于后两天的最低价
            df_daily[\'down\'] = (df_daily_shift_3[\'low\'] < df_daily_shift_1[\'low\']) &\
                            (df_daily_shift_3[\'low\'] < df_daily_shift_2[\'low\']) &\
                            (df_daily_shift_3[\'low\'] < df_daily_shift_4[\'low\']) &\
                            (df_daily_shift_3[\'low\'] < df_daily[\'low\'])
            #只保留出现顶分型和低分型的日期
            df_daily = df_daily[(df_daily[\'up\'] | df_daily[\'down\'])]
            #抛掉不用的数据
            df_daily.drop([\'high\',\'low\'],1,inplace = True)
            print(df_daily)

            #将信号保存在数据库中
            update_requests = []
            for date in df_daily.index:
                doc = {
                    \'code\':code,
                    \'date\':date,
                    #up:顶分型,down:低分型
                    \'direction\':\'up\' if df_daily.loc[date][\'up\'] else \'down\'
                }
                #保存时以code、date和direction做条件,那么就需要在这三个字段上建立索引
                #db.fractal_signal.createIndex({\'code\':1,\'date\':1,\'direction\':1})
                update_requests.append(
                    UpdateOne(doc,{\'$set\':doc},upsert=True))
            
            if len(update_requests)>0:
                update_result = DB_CONN[\'fractal_signal\'].bulk_write(update_requests,ordered=False)
                print(\'%s ,upserted:%4d,modified:%4d\' %(code,update_result.upserted_count,update_result.modified_count),flush=True)
        except :
            print(\'错误发生:%s\'%code,flush=True)
            traceback.print_exc()
if __name__ == "__main__":
    compute_fractal(\'2019-01-01\',\'2019-12-31\')

提供外部接口,供回测


#提供外部接口
def is_fractal_up(code,date):
    count = DB_CONN[\'fractal_signal\'].count({\'code\':code,\'date\':date,\'direction\':\'up\'})
    return count == 1
    """
    查询某只股票在某个日期是否出现顶分型信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现顶分型信号,False - 没有出现顶分型信号
    """

def is_fractal_down(code,date):
    """
    查询某只股票在某个日期是否出现底分型信号
    :param code: 股票代码
    :param date: 日期
    :return: True - 出现底分型信号,False - 没有出现底分型信号
    """
    count = DB_CONN[\'fractal-signal\'].count({\'code\':code,\'date\':date,\'direction\':\'down\'})
    return count == 1
回测结果展示


检测宽度:N \(\in (3,5,7,9···)\);符合对称性

就目前的数据利用这四种信号得到回测结果都不尽人意,不排除本地获取、处理数据的问题,但对股票的选股、信号的计算、简单的回测、简单的持仓管理都有了简单的了解。

分类:

技术点:

相关文章:

  • 2021-11-13
  • 2021-06-11
  • 2021-11-30
  • 2021-11-29
  • 2022-12-23
  • 2021-12-20
  • 2021-12-30
  • 2021-11-09
猜你喜欢
  • 2021-10-31
  • 2021-11-29
  • 2021-06-03
  • 2021-11-29
  • 2022-03-05
  • 2021-12-16
  • 2022-12-23
相关资源
相似解决方案