这里给出的模型是我没有进行参数调优的,所以效果不是最终的理想状态。
#%%
! pip install tushare
#%% md
https://mp.weixin.qq.com/s?__biz=Mzg4MDE3OTA5NA==&mid=2247491137&idx=1&sn=9506137b0ba2f1b59fadae117aaa97dd&source=41#wechat_redirect
x = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1, activation= \'sigmoid\'))(x)
#%%
# 导入包
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import tushare as ts
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
#根据你的特征输入的大小调节
#参数设置/parameter setting
timesteps = seq_length = 20 #时间窗/window length
data_dim = 7 #输入数据维度/dimension of input data
output_dim = 1 #输出数据维度/dimension of output data
ts_code = \'000001.SZ\'
start_date = \'20120101\'
end_date = \'20200301\'
# A1
# 设置显示的最大列、宽等参数,消掉打印不完全中间的省略号
# pd.set_option(\'display.max_columns\', 1000)
# pd.set_option(\'display.width\', 1000)
# pd.set_option(\'display.max_colwidth\', 1000)
# pd.set_option(\'display.height\', 1000)
#显示所有列
pd.set_option(\'display.max_columns\', None)
#显示所有行
pd.set_option(\'display.max_rows\', None)
#数据准备/data preparation
#变量选取Open,High,Low,Close,Volume等
pro = ts.pro_api(\'XXXXXXXXXXXXXXXXXXXXXXXXXXXX\') #token可以在新版tushare的网站上找到
stock_data = pro.query(\'daily\',ts_code = \'000001.SZ\', start_date = \'20120101\', end_date = \'20200301\')
stock_data = stock_data[::-1] #倒序,使日期靠前的排在前面
print(stock_data.head(5))
stock_data.reset_index(drop=True, inplace=True) #把每行的索引改为“0、1、2……”
xy = stock_data[[\'open\',\'close\',\'high\',\'low\',\'vol\',\'pct_chg\',\'amount\']] #选取需要的features
xy = np.array(xy.values) #转为array
#%% md
开始切分
#%%
#切分训练集合测试集/split to train and testing
train_size = int(len(xy) * 0.7) #训练集长度
test_size = len(xy) - train_size #测试集长度
xy_train, xy_test = np.array(xy[0:train_size]),np.array(xy[train_size:len(xy)]) #划分训练集、测试集
scaler = MinMaxScaler()
xy_train_new = scaler.fit_transform(xy_train) #预处理,按列操作,每列最小值为0,最大值为1
x_new = xy_train_new[:,0:] #features
y_new = xy_train_new[:,1] * 10 #labels,适当放大方便训练
x = x_new
y = y_new
dataX = []
dataY = []
for i in range(0, len(y) - seq_length):
_x = x[i:i + seq_length]
_y = y[i + seq_length] # Next close price
# print(_x, "->", _y)
dataX.append(_x)
dataY.append(_y)
#处理数据shape,准备进入神经网络层
x_real = np.vstack(dataX).reshape(-1,seq_length,data_dim)
y_real= np.vstack(dataY).reshape(-1,output_dim)
print(x_real.shape)
print(y_real.shape)
dataX = x_real
dataY = y_real
trainX, trainY = dataX, dataY
xy_test_new = scaler.transform(xy_test) #使用训练集的scaler预处理测试集的数据
x_new = xy_test_new[:,0:]
y_new = xy_test_new[:,1] * 10
x = x_new
y = y_new
dataX = []
dataY = []
for i in range(0, len(y) - seq_length):
_x = x[i:i + seq_length]
_y = y[i + seq_length] # Next price change
# print(_x, "->", _y)
dataX.append(_x)
dataY.append(_y)
#处理数据shape,准备进入神经网络层
x_real = np.vstack(dataX).reshape(-1,seq_length,data_dim)
y_real= np.vstack(dataY).reshape(-1,output_dim)
print(x_real.shape)
print(y_real.shape)
dataX = x_real
dataY = y_real
testX, testY = dataX, dataY
# print(\'训练集第一个窗口数据\')
# print(trainX[0])
# print(\'训练集第一个回归数据\')
# print(trainY[0])
# print(\'验证集第一个窗口数据\')
# print(testX[0])
# print(\'验证集第一个回归数据\')
# print(testY[0])
#%% md
建立模型
#%%
# B1
import os
import tensorflow as tf
inputs = tf.keras.Input(shape=(seq_length, data_dim))
# x = tf.keras.layers.Dense(units=10,activation=\'relu\')(i)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=50,return_sequences=True))(inputs)
# x = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1, activation= \'sigmoid\'))
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=50))(x)
# x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(20)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(inputs=inputs, outputs=output)
#%% md
模型训练
#%%
import os
import time
log_dir= \'./drive/My Drive/callsbacks_b1_chun\'
if not os.path.exists(log_dir):
os.mkdir(log_dir)
output_model_file = os.path.join(log_dir,"best2.h5")
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir)
]
# opt = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer=\'adam\', loss=\'mean_squared_error\',metrics=[\'mae\',\'mse\'])
# model.compile(optimizer=\'rmsprop\', loss=\'mae\',metrics=[\'mae\',\'mse\'])
print(model.summary())
start = time.time()
# history = model.fit(trainX, trainY, batch_size=256, epochs=200, verbose=2, validation_split=0.1)
history = model.fit(trainX, trainY, batch_size=256, epochs=200, verbose=2, validation_split=0.1,callbacks=callbacks)
elapsed = (time.time() - start)
print(elapsed)
#%% md
模型的评估
#%%
# Plot training & validation loss values绘制训练和验证集的损失函数
plt.plot(history.history[\'loss\'])
plt.plot(history.history[\'val_loss\'])
plt.title(\'Model loss\')
plt.ylabel(\'Loss\')
plt.xlabel(\'Epoch\')
plt.legend([\'Train\', \'Test\'], loc=\'upper left\')
plt.show()
#%% md
训练样本的评估
#%%
trainPredict2 = model.predict(trainX) #查看⚠️训练结果
# trainPredict2_2 = scaler.inverse_transform(trainPredict2)
# trainY2=scaler.inverse_transform(trainY)
trainPredict2_2 = trainPredict2 / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
trainY2 = trainY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
#下面画出收盘价走势
plt.subplots(figsize=(16, 6))
# ax.plot(trainY2,color=\'blue\')
# ax.plot(trainPredict2_2,color=\'orange\')
plt.plot(trainY2, color=\'blue\', label=\'REAL Stock Price\')
plt.plot(trainPredict2_2, color=\'orange\', label=\'PREDICT Stock Price\')
plt.title(\' TRAIN CLOSE PRICE\')
plt.xlabel(\'Time\')
plt.ylabel(\'Stock Price\')
plt.legend()
plt.show()
#%%
plt.figure(figsize=(8,8)) #画布大小
plt.xlim((5,30)) #x坐标范围
plt.ylim((5,30)) #y坐标范围
plt.scatter(trainY2, trainPredict2_2) #理想情况下散点应该分布在斜率为1的直线周围
plt.ylabel(\'prediction\')
plt.xlabel(\'label\')
print(\'训练平均误差:\',np.mean((trainPredict2_2 - trainY2) / trainY2 * 100)) #平均误差(%)
print(\'训练最大误差:\',np.max((trainPredict2_2 - trainY2) / trainY2 * 100)) #最大误差(%)
print(\'训练最小误差:\',np.min((trainPredict2_2 - trainY2) / trainY2 * 100)) #最小误差(%)
#计算误差小于5%的比例
count = 0
for i in range(len(trainY2)):
if abs(trainPredict2_2[i] - trainY2[i]) / trainY2[i] * 100 <= 5:
count += 1
count = count / len(trainY2) * 100
print(\'训练误差小于5%的比例:\',count)
#%% md
测试样本的评估
#%%
testPredict2 = model.predict(testX) #查看测试结果
testPredict2_2 = testPredict2 / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
testY2 = testY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
# ⚠️⚠️⚠️因为本身是模型预测趋势,而对于转折点对预测还是不高,所以在验证集上表现不是特别好⚠️蓝线和黄线是独立对没有联系对,也就是说蓝线是我们上帝视角提前获取对未来数据,而黄线是模型自己拟合出来对数据
print(model.evaluate(testX, testPredict2_2))
#下面画出收盘价走势
fig, ax = plt.subplots(figsize=(16, 6))
# ax.plot(testY2,color=\'blue\')
# ax.plot(testPredict2_2,color=\'orange\')
# Visualising the results 使用Matplotlib将预测股价和实际股价的结果可视化。
plt.plot(testY2, color=\'blue\', label=\'REAL Stock Price\')
plt.plot(testPredict2_2, color=\'orange\', label=\'PREDICT Stock Price\')
plt.title(\'TEST CLOSE PRICE\')
plt.xlabel(\'Time\')
plt.ylabel(\'Stock Price\')
plt.legend()
plt.show()
#%%
plt.figure(figsize=(8,8)) #画布大小
plt.xlim((5,20)) #x坐标范围
plt.ylim((5,20)) #y坐标范围
plt.scatter(testY2, testPredict2_2) #理想情况下散点应该分布在斜率为1的直线周围
plt.ylabel(\'prediction\')
plt.xlabel(\'label\')
print(\'测试平均误差:\',np.mean((testPredict2_2 - testY2) / testY2 * 100)) #平均误差(%)
print(\'测试最大误差:\',np.max((testPredict2_2 - testY2) / testY2 * 100)) #最大误差(%)
print(\'测试最小误差:\',np.min((testPredict2_2 - testY2) / testY2 * 100)) #最小误差(%)
#计算误差小于5%的比例
count = 0
for i in range(len(testY2)):
if abs(testPredict2_2[i] - testY2[i]) / testY2[i] * 100 <= 5:
count += 1
count = count / len(testY2) * 100
print(\'测试误差小于5%的比例:\',count)
#%% md
对模型整体训练对评估
#%%
# 模型的评估loss的值越低说明模型拟合的效果越好,LSTM模型主要采用RMSE作为评价标准
from sklearn.metrics import mean_squared_error,mean_absolute_error,r2_score
from math import sqrt
#回归评价指标
# calculate MSE 均方误差
mse=mean_squared_error(testY2,testPredict2_2)
# calculate RMSE 均方根误差
rmse = sqrt(mean_squared_error(testY2, testPredict2_2))
#calculate MAE 平均绝对误差
mae=mean_absolute_error(testY2,testPredict2_2)
#calculate R square
r_square=r2_score(testY2,testPredict2_2)
print(\'均方误差mse: %.6f\' % mse)
print(\'均方根误差rmse: %.6f\' % rmse)
print(\'平均绝对误差mae: %.6f\' % mae)
print(\'R_square: %.6f\' % r_square)
#%% md
股票评价
#%%
#计算对转折点的预测正确率
correct = np.zeros(len(testPredict2_2))
for i in range(1, len(testPredict2_2)):
if np.sign(testPredict2_2[i] - testPredict2_2[i-1]) == np.sign(testY2[i] - testY2[i-1]): #如果对涨或跌的判断准确,这里用正负符号判断###########################################################
correct[i] = 1 #就加1
accuracy = np.sum(correct) / len(correct) * 100
print(\'对转折点的预测正确率:\',accuracy)
#如果对明天的预测价格高于今天的收盘价,就买进并持有一天,计算能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testPredict2_2[i] >= testY2[i-1]:
count = count + (testY2[i] - testY2[i-1])
print(\'\n\n如果对明天的预测价格高于今天的收盘价,就买进并持有一天,能挣{}元\'.format(count))
#如果对明天的预测价格高于今天的收盘价,就买进并持有十一天,计算能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testPredict2_2[i] >= testY2[i-1]:
if i+10 <len(testPredict2_2):
count = count + (testY2[i+10] - testY2[i-1])
print(\'如果对明天的预测价格高于今天的收盘价,就买进并持有十一天,能挣{}元\'.format(count))
#最理想的状况下,能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testY2[i] >= testY2[i-1]:
count = count + (testY2[i] - testY2[i-1])
print(\'最理想的状况下,能挣{}元\'.format(count))
#%%
#如果对明天的预测价格高于今天的收盘价,就买进并持有n天,计算能挣多少钱
print(len(testPredict2_2))
maxmoneyday = 0
maxmoney = 0
for n in range(30):
count = 0
for i in range(1, len(testPredict2_2)):
# for n in range(len(testPredict2_2)-i):
# print(n)
if testPredict2_2[i] >= testY2[i-1]:
if i+n <len(testPredict2_2):
count = count + (testY2[i+n] - testY2[i-1])
print(\'在所有的这一段时间中,如果明天的预测价格高于今天的收盘价,就买进一股并持有{}天后卖出,总共能挣{}元\'.format(n+1,count))
if maxmoney <count:
maxmoney = count
maxmoneyday = n+1
print(\'最佳的固定持有相同天数的方式投资赚了{},持有{}天\'.format(maxmoney,maxmoneyday))
#%%
# 这里是导出可视化对界面
%reload_ext tensorboard
%tensorboard --logdir \'./drive/My Drive/callsbacks_b1_chun\'
#%%
# 保存模型
# tf.saved_model.save(model, "Bilstm七输入0408.h5")
#%%
# 载入模型
# model = tf.saved_model.load(\'Bilstm七输入0402.h5\')
#%% md
日线行情输出参数
名称 类型 描述
ts_code str 股票代码
trade_date str 交易日期
open float 开盘价
high float 最高价
low float 最低价
close float 收盘价
pre_close float 昨收价
change float 涨跌额
pct_chg float 涨跌幅 (未复权,如果是复权请用 通用行情接口 )
vol float 成交量 (手)
amount float 成交额 (千元)
---
每日指标输出参数
名称 类型 描述
ts_code str TS股票代码
trade_date str 交易日期
close float 当日收盘价
turnover_rate float 换手率(%)
turnover_rate_f float 换手率(自由流通股)
volume_ratio float 量比
pe float 市盈率(总市值/净利润, 亏损的PE为空)
pe_ttm float 市盈率(TTM,亏损的PE为空)
pb float 市净率(总市值/净资产)
ps float 市销率
ps_ttm float 市销率(TTM)
dv_ratio float 股息率 (%)
dv_ttm float 股息率(TTM)(%)
total_share float 总股本 (万股)
float_share float 流通股本 (万股)
free_share float 自由流通股本 (万)
total_mv float 总市值 (万元)
circ_mv float 流通市值(万元)
相关文章: